opendal/services/dbfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use super::DEFAULT_SCHEME;
33use crate::raw::*;
34use crate::services::DbfsConfig;
35use crate::*;
36impl Configurator for DbfsConfig {
37 type Builder = DbfsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 DbfsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47 config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 let mut ds = f.debug_struct("DbfsBuilder");
53
54 ds.field("config", &self.config);
55
56 ds.finish()
57 }
58}
59
60impl DbfsBuilder {
61 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
81 self.config.endpoint = if endpoint.is_empty() {
82 None
83 } else {
84 Some(endpoint.trim_end_matches('/').to_string())
85 };
86 self
87 }
88
89 pub fn token(mut self, token: &str) -> Self {
91 if !token.is_empty() {
92 self.config.token = Some(token.to_string());
93 }
94 self
95 }
96}
97
98impl Builder for DbfsBuilder {
99 type Config = DbfsConfig;
100
101 fn build(self) -> Result<impl Access> {
103 debug!("backend build started: {:?}", &self);
104
105 let root = normalize_root(&self.config.root.unwrap_or_default());
106 debug!("backend use root {root}");
107
108 let endpoint = match &self.config.endpoint {
109 Some(endpoint) => Ok(endpoint.clone()),
110 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
111 .with_operation("Builder::build")
112 .with_context("service", Scheme::Dbfs)),
113 }?;
114 debug!("backend use endpoint: {}", &endpoint);
115
116 let token = match self.config.token {
117 Some(token) => token,
118 None => {
119 return Err(Error::new(
120 ErrorKind::ConfigInvalid,
121 "missing token for Dbfs",
122 ));
123 }
124 };
125
126 let client = HttpClient::new()?;
127 Ok(DbfsBackend {
128 core: Arc::new(DbfsCore {
129 root,
130 endpoint: endpoint.to_string(),
131 token,
132 client,
133 }),
134 })
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct DbfsBackend {
141 core: Arc<DbfsCore>,
142}
143
144impl Access for DbfsBackend {
145 type Reader = ();
146 type Writer = oio::OneShotWriter<DbfsWriter>;
147 type Lister = oio::PageLister<DbfsLister>;
148 type Deleter = oio::OneShotDeleter<DbfsDeleter>;
149
150 fn info(&self) -> Arc<AccessorInfo> {
151 let am = AccessorInfo::default();
152 am.set_scheme(DEFAULT_SCHEME)
153 .set_root(&self.core.root)
154 .set_native_capability(Capability {
155 stat: true,
156
157 write: true,
158 create_dir: true,
159 delete: true,
160 rename: true,
161
162 list: true,
163
164 shared: true,
165
166 ..Default::default()
167 });
168 am.into()
169 }
170
171 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
172 let resp = self.core.dbfs_create_dir(path).await?;
173
174 let status = resp.status();
175
176 match status {
177 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
178 _ => Err(parse_error(resp)),
179 }
180 }
181
182 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
183 if path == "/" {
185 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
186 }
187
188 let resp = self.core.dbfs_get_status(path).await?;
189
190 let status = resp.status();
191
192 match status {
193 StatusCode::OK => {
194 let mut meta = parse_into_metadata(path, resp.headers())?;
195 let bs = resp.into_body();
196 let decoded_response: DbfsStatus =
197 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
198 meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
199 decoded_response.modification_time,
200 )?);
201 match decoded_response.is_dir {
202 true => meta.set_mode(EntryMode::DIR),
203 false => {
204 meta.set_mode(EntryMode::FILE);
205 meta.set_content_length(decoded_response.file_size as u64)
206 }
207 };
208 Ok(RpStat::new(meta))
209 }
210 StatusCode::NOT_FOUND if path.ends_with('/') => {
211 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
212 }
213 _ => Err(parse_error(resp)),
214 }
215 }
216
217 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
218 Ok((
219 RpWrite::default(),
220 oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
221 ))
222 }
223
224 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
225 Ok((
226 RpDelete::default(),
227 oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
228 ))
229 }
230
231 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
232 let l = DbfsLister::new(self.core.clone(), path.to_string());
233
234 Ok((RpList::default(), oio::PageLister::new(l)))
235 }
236
237 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
238 self.core.dbfs_ensure_parent_path(to).await?;
239
240 let resp = self.core.dbfs_rename(from, to).await?;
241
242 let status = resp.status();
243
244 match status {
245 StatusCode::OK => Ok(RpRename::default()),
246 _ => Err(parse_error(resp)),
247 }
248 }
249}
250
251#[derive(Deserialize)]
252struct DbfsStatus {
253 is_dir: bool,
256 file_size: i64,
257 modification_time: i64,
258}