opendal/services/dbfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use crate::raw::*;
33use crate::services::DbfsConfig;
34use crate::*;
35
36impl Configurator for DbfsConfig {
37 type Builder = DbfsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 DbfsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47 config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 let mut ds = f.debug_struct("DbfsBuilder");
53
54 ds.field("config", &self.config);
55
56 ds.finish()
57 }
58}
59
60impl DbfsBuilder {
61 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
81 self.config.endpoint = if endpoint.is_empty() {
82 None
83 } else {
84 Some(endpoint.trim_end_matches('/').to_string())
85 };
86 self
87 }
88
89 pub fn token(mut self, token: &str) -> Self {
91 if !token.is_empty() {
92 self.config.token = Some(token.to_string());
93 }
94 self
95 }
96}
97
98impl Builder for DbfsBuilder {
99 const SCHEME: Scheme = Scheme::Dbfs;
100 type Config = DbfsConfig;
101
102 fn build(self) -> Result<impl Access> {
104 debug!("backend build started: {:?}", &self);
105
106 let root = normalize_root(&self.config.root.unwrap_or_default());
107 debug!("backend use root {}", root);
108
109 let endpoint = match &self.config.endpoint {
110 Some(endpoint) => Ok(endpoint.clone()),
111 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
112 .with_operation("Builder::build")
113 .with_context("service", Scheme::Dbfs)),
114 }?;
115 debug!("backend use endpoint: {}", &endpoint);
116
117 let token = match self.config.token {
118 Some(token) => token,
119 None => {
120 return Err(Error::new(
121 ErrorKind::ConfigInvalid,
122 "missing token for Dbfs",
123 ));
124 }
125 };
126
127 let client = HttpClient::new()?;
128 Ok(DbfsBackend {
129 core: Arc::new(DbfsCore {
130 root,
131 endpoint: endpoint.to_string(),
132 token,
133 client,
134 }),
135 })
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct DbfsBackend {
142 core: Arc<DbfsCore>,
143}
144
145impl Access for DbfsBackend {
146 type Reader = ();
147 type Writer = oio::OneShotWriter<DbfsWriter>;
148 type Lister = oio::PageLister<DbfsLister>;
149 type Deleter = oio::OneShotDeleter<DbfsDeleter>;
150 type BlockingReader = ();
151 type BlockingWriter = ();
152 type BlockingLister = ();
153 type BlockingDeleter = ();
154
155 fn info(&self) -> Arc<AccessorInfo> {
156 let am = AccessorInfo::default();
157 am.set_scheme(Scheme::Dbfs)
158 .set_root(&self.core.root)
159 .set_native_capability(Capability {
160 stat: true,
161 stat_has_cache_control: true,
162 stat_has_content_length: true,
163 stat_has_content_type: true,
164 stat_has_content_encoding: true,
165 stat_has_content_range: true,
166 stat_has_etag: true,
167 stat_has_content_md5: true,
168 stat_has_last_modified: true,
169 stat_has_content_disposition: true,
170
171 write: true,
172 create_dir: true,
173 delete: true,
174 rename: true,
175
176 list: true,
177 list_has_last_modified: true,
178 list_has_content_length: true,
179
180 shared: true,
181
182 ..Default::default()
183 });
184 am.into()
185 }
186
187 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
188 let resp = self.core.dbfs_create_dir(path).await?;
189
190 let status = resp.status();
191
192 match status {
193 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
194 _ => Err(parse_error(resp)),
195 }
196 }
197
198 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
199 if path == "/" {
201 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
202 }
203
204 let resp = self.core.dbfs_get_status(path).await?;
205
206 let status = resp.status();
207
208 match status {
209 StatusCode::OK => {
210 let mut meta = parse_into_metadata(path, resp.headers())?;
211 let bs = resp.into_body();
212 let decoded_response: DbfsStatus =
213 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
214 meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
215 decoded_response.modification_time,
216 )?);
217 match decoded_response.is_dir {
218 true => meta.set_mode(EntryMode::DIR),
219 false => {
220 meta.set_mode(EntryMode::FILE);
221 meta.set_content_length(decoded_response.file_size as u64)
222 }
223 };
224 Ok(RpStat::new(meta))
225 }
226 StatusCode::NOT_FOUND if path.ends_with('/') => {
227 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
228 }
229 _ => Err(parse_error(resp)),
230 }
231 }
232
233 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
234 Ok((
235 RpWrite::default(),
236 oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
237 ))
238 }
239
240 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
241 Ok((
242 RpDelete::default(),
243 oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
244 ))
245 }
246
247 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
248 let l = DbfsLister::new(self.core.clone(), path.to_string());
249
250 Ok((RpList::default(), oio::PageLister::new(l)))
251 }
252
253 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
254 self.core.dbfs_ensure_parent_path(to).await?;
255
256 let resp = self.core.dbfs_rename(from, to).await?;
257
258 let status = resp.status();
259
260 match status {
261 StatusCode::OK => Ok(RpRename::default()),
262 _ => Err(parse_error(resp)),
263 }
264 }
265}
266
267#[derive(Deserialize)]
268struct DbfsStatus {
269 is_dir: bool,
272 file_size: i64,
273 modification_time: i64,
274}