opendal/services/alluxio/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use log::debug;
23
24use super::ALLUXIO_SCHEME;
25use super::config::AlluxioConfig;
26use super::core::AlluxioCore;
27use super::deleter::AlluxioDeleter;
28use super::error::parse_error;
29use super::lister::AlluxioLister;
30use super::writer::AlluxioWriter;
31use super::writer::AlluxioWriters;
32use crate::raw::*;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
37#[derive(Default)]
38pub struct AlluxioBuilder {
39 pub(super) config: AlluxioConfig,
40
41 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
42 pub(super) http_client: Option<HttpClient>,
43}
44
45impl Debug for AlluxioBuilder {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("AlluxioBuilder")
48 .field("config", &self.config)
49 .finish_non_exhaustive()
50 }
51}
52
53impl AlluxioBuilder {
54 pub fn root(mut self, root: &str) -> Self {
58 self.config.root = if root.is_empty() {
59 None
60 } else {
61 Some(root.to_string())
62 };
63
64 self
65 }
66
67 pub fn endpoint(mut self, endpoint: &str) -> Self {
71 if !endpoint.is_empty() {
72 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
74 }
75
76 self
77 }
78
79 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
86 #[allow(deprecated)]
87 pub fn http_client(mut self, client: HttpClient) -> Self {
88 self.http_client = Some(client);
89 self
90 }
91}
92
93impl Builder for AlluxioBuilder {
94 type Config = AlluxioConfig;
95
96 fn build(self) -> Result<impl Access> {
98 debug!("backend build started: {:?}", &self);
99
100 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
101 debug!("backend use root {}", &root);
102
103 let endpoint = match &self.config.endpoint {
104 Some(endpoint) => Ok(endpoint.clone()),
105 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
106 .with_operation("Builder::build")
107 .with_context("service", ALLUXIO_SCHEME)),
108 }?;
109 debug!("backend use endpoint {}", &endpoint);
110
111 Ok(AlluxioBackend {
112 core: Arc::new(AlluxioCore {
113 info: {
114 let am = AccessorInfo::default();
115 am.set_scheme(ALLUXIO_SCHEME)
116 .set_root(&root)
117 .set_native_capability(Capability {
118 stat: true,
119
120 read: false,
125
126 write: true,
127 write_can_multi: true,
128
129 create_dir: true,
130 delete: true,
131
132 list: true,
133
134 shared: true,
135
136 ..Default::default()
137 });
138
139 #[allow(deprecated)]
141 if let Some(client) = self.http_client {
142 am.update_http_client(|_| client);
143 }
144
145 am.into()
146 },
147 root,
148 endpoint,
149 }),
150 })
151 }
152}
153
154#[derive(Debug, Clone)]
155pub struct AlluxioBackend {
156 core: Arc<AlluxioCore>,
157}
158
159impl Access for AlluxioBackend {
160 type Reader = HttpBody;
161 type Writer = AlluxioWriters;
162 type Lister = oio::PageLister<AlluxioLister>;
163 type Deleter = oio::OneShotDeleter<AlluxioDeleter>;
164
165 fn info(&self) -> Arc<AccessorInfo> {
166 self.core.info.clone()
167 }
168
169 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
170 self.core.create_dir(path).await?;
171 Ok(RpCreateDir::default())
172 }
173
174 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
175 let file_info = self.core.get_status(path).await?;
176
177 Ok(RpStat::new(file_info.try_into()?))
178 }
179
180 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
181 let stream_id = self.core.open_file(path).await?;
182
183 let resp = self.core.read(stream_id, args.range()).await?;
184 if !resp.status().is_success() {
185 let (part, mut body) = resp.into_parts();
186 let buf = body.to_buffer().await?;
187 return Err(parse_error(Response::from_parts(part, buf)));
188 }
189 Ok((RpRead::new(), resp.into_body()))
190 }
191
192 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
193 let w = AlluxioWriter::new(self.core.clone(), args.clone(), path.to_string());
194
195 Ok((RpWrite::default(), w))
196 }
197
198 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
199 Ok((
200 RpDelete::default(),
201 oio::OneShotDeleter::new(AlluxioDeleter::new(self.core.clone())),
202 ))
203 }
204
205 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
206 let l = AlluxioLister::new(self.core.clone(), path);
207 Ok((RpList::default(), oio::PageLister::new(l)))
208 }
209
210 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
211 self.core.rename(from, to).await?;
212
213 Ok(RpRename::default())
214 }
215}
216
217#[cfg(test)]
218mod test {
219 use std::collections::HashMap;
220
221 use super::*;
222
223 #[test]
224 fn test_builder_from_map() {
225 let mut map = HashMap::new();
226 map.insert("root".to_string(), "/".to_string());
227 map.insert("endpoint".to_string(), "http://127.0.0.1:39999".to_string());
228
229 let builder = AlluxioConfig::from_iter(map).unwrap();
230
231 assert_eq!(builder.root, Some("/".to_string()));
232 assert_eq!(builder.endpoint, Some("http://127.0.0.1:39999".to_string()));
233 }
234
235 #[test]
236 fn test_builder_build() {
237 let builder = AlluxioBuilder::default()
238 .root("/root")
239 .endpoint("http://127.0.0.1:39999")
240 .build();
241
242 assert!(builder.is_ok());
243 }
244}