opendal/services/dbfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use crate::raw::*;
33use crate::services::DbfsConfig;
34use crate::*;
35
36impl Configurator for DbfsConfig {
37 type Builder = DbfsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 DbfsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47 config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 let mut ds = f.debug_struct("DbfsBuilder");
53
54 ds.field("config", &self.config);
55
56 ds.finish()
57 }
58}
59
60impl DbfsBuilder {
61 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
81 self.config.endpoint = if endpoint.is_empty() {
82 None
83 } else {
84 Some(endpoint.trim_end_matches('/').to_string())
85 };
86 self
87 }
88
89 pub fn token(mut self, token: &str) -> Self {
91 if !token.is_empty() {
92 self.config.token = Some(token.to_string());
93 }
94 self
95 }
96}
97
98impl Builder for DbfsBuilder {
99 const SCHEME: Scheme = Scheme::Dbfs;
100 type Config = DbfsConfig;
101
102 fn build(self) -> Result<impl Access> {
104 debug!("backend build started: {:?}", &self);
105
106 let root = normalize_root(&self.config.root.unwrap_or_default());
107 debug!("backend use root {}", root);
108
109 let endpoint = match &self.config.endpoint {
110 Some(endpoint) => Ok(endpoint.clone()),
111 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
112 .with_operation("Builder::build")
113 .with_context("service", Scheme::Dbfs)),
114 }?;
115 debug!("backend use endpoint: {}", &endpoint);
116
117 let token = match self.config.token {
118 Some(token) => token,
119 None => {
120 return Err(Error::new(
121 ErrorKind::ConfigInvalid,
122 "missing token for Dbfs",
123 ));
124 }
125 };
126
127 let client = HttpClient::new()?;
128 Ok(DbfsBackend {
129 core: Arc::new(DbfsCore {
130 root,
131 endpoint: endpoint.to_string(),
132 token,
133 client,
134 }),
135 })
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct DbfsBackend {
142 core: Arc<DbfsCore>,
143}
144
145impl Access for DbfsBackend {
146 type Reader = ();
147 type Writer = oio::OneShotWriter<DbfsWriter>;
148 type Lister = oio::PageLister<DbfsLister>;
149 type Deleter = oio::OneShotDeleter<DbfsDeleter>;
150
151 fn info(&self) -> Arc<AccessorInfo> {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::Dbfs)
154 .set_root(&self.core.root)
155 .set_native_capability(Capability {
156 stat: true,
157 stat_has_cache_control: true,
158 stat_has_content_length: true,
159 stat_has_content_type: true,
160 stat_has_content_encoding: true,
161 stat_has_content_range: true,
162 stat_has_etag: true,
163 stat_has_content_md5: true,
164 stat_has_last_modified: true,
165 stat_has_content_disposition: true,
166
167 write: true,
168 create_dir: true,
169 delete: true,
170 rename: true,
171
172 list: true,
173 list_has_last_modified: true,
174 list_has_content_length: true,
175
176 shared: true,
177
178 ..Default::default()
179 });
180 am.into()
181 }
182
183 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
184 let resp = self.core.dbfs_create_dir(path).await?;
185
186 let status = resp.status();
187
188 match status {
189 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
190 _ => Err(parse_error(resp)),
191 }
192 }
193
194 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
195 if path == "/" {
197 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
198 }
199
200 let resp = self.core.dbfs_get_status(path).await?;
201
202 let status = resp.status();
203
204 match status {
205 StatusCode::OK => {
206 let mut meta = parse_into_metadata(path, resp.headers())?;
207 let bs = resp.into_body();
208 let decoded_response: DbfsStatus =
209 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
210 meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
211 decoded_response.modification_time,
212 )?);
213 match decoded_response.is_dir {
214 true => meta.set_mode(EntryMode::DIR),
215 false => {
216 meta.set_mode(EntryMode::FILE);
217 meta.set_content_length(decoded_response.file_size as u64)
218 }
219 };
220 Ok(RpStat::new(meta))
221 }
222 StatusCode::NOT_FOUND if path.ends_with('/') => {
223 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
224 }
225 _ => Err(parse_error(resp)),
226 }
227 }
228
229 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
230 Ok((
231 RpWrite::default(),
232 oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
233 ))
234 }
235
236 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
237 Ok((
238 RpDelete::default(),
239 oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
240 ))
241 }
242
243 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
244 let l = DbfsLister::new(self.core.clone(), path.to_string());
245
246 Ok((RpList::default(), oio::PageLister::new(l)))
247 }
248
249 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
250 self.core.dbfs_ensure_parent_path(to).await?;
251
252 let resp = self.core.dbfs_rename(from, to).await?;
253
254 let status = resp.status();
255
256 match status {
257 StatusCode::OK => Ok(RpRename::default()),
258 _ => Err(parse_error(resp)),
259 }
260 }
261}
262
263#[derive(Deserialize)]
264struct DbfsStatus {
265 is_dir: bool,
268 file_size: i64,
269 modification_time: i64,
270}