opendal/services/dbfs/
backend.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use log::debug;
23use serde::Deserialize;
24
25use super::DBFS_SCHEME;
26use super::core::DbfsCore;
27use super::deleter::DbfsDeleter;
28use super::error::parse_error;
29use super::lister::DbfsLister;
30use super::writer::DbfsWriter;
31use crate::raw::*;
32use crate::services::DbfsConfig;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
37#[derive(Debug, Default)]
38pub struct DbfsBuilder {
39 pub(super) config: DbfsConfig,
40}
41
42impl DbfsBuilder {
43 pub fn root(mut self, root: &str) -> Self {
47 self.config.root = if root.is_empty() {
48 None
49 } else {
50 Some(root.to_string())
51 };
52
53 self
54 }
55
56 pub fn endpoint(mut self, endpoint: &str) -> Self {
63 self.config.endpoint = if endpoint.is_empty() {
64 None
65 } else {
66 Some(endpoint.trim_end_matches('/').to_string())
67 };
68 self
69 }
70
71 pub fn token(mut self, token: &str) -> Self {
73 if !token.is_empty() {
74 self.config.token = Some(token.to_string());
75 }
76 self
77 }
78}
79
80impl Builder for DbfsBuilder {
81 type Config = DbfsConfig;
82
83 fn build(self) -> Result<impl Access> {
85 debug!("backend build started: {:?}", &self);
86
87 let root = normalize_root(&self.config.root.unwrap_or_default());
88 debug!("backend use root {root}");
89
90 let endpoint = match &self.config.endpoint {
91 Some(endpoint) => Ok(endpoint.clone()),
92 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
93 .with_operation("Builder::build")
94 .with_context("service", DBFS_SCHEME)),
95 }?;
96 debug!("backend use endpoint: {}", &endpoint);
97
98 let token = match self.config.token {
99 Some(token) => token,
100 None => {
101 return Err(Error::new(
102 ErrorKind::ConfigInvalid,
103 "missing token for Dbfs",
104 ));
105 }
106 };
107
108 let client = HttpClient::new()?;
109 Ok(DbfsBackend {
110 core: Arc::new(DbfsCore {
111 root,
112 endpoint: endpoint.to_string(),
113 token,
114 client,
115 }),
116 })
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct DbfsBackend {
123 core: Arc<DbfsCore>,
124}
125
126impl Access for DbfsBackend {
127 type Reader = ();
128 type Writer = oio::OneShotWriter<DbfsWriter>;
129 type Lister = oio::PageLister<DbfsLister>;
130 type Deleter = oio::OneShotDeleter<DbfsDeleter>;
131
132 fn info(&self) -> Arc<AccessorInfo> {
133 let am = AccessorInfo::default();
134 am.set_scheme(DBFS_SCHEME)
135 .set_root(&self.core.root)
136 .set_native_capability(Capability {
137 stat: true,
138
139 write: true,
140 create_dir: true,
141 delete: true,
142 rename: true,
143
144 list: true,
145
146 shared: true,
147
148 ..Default::default()
149 });
150 am.into()
151 }
152
153 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
154 let resp = self.core.dbfs_create_dir(path).await?;
155
156 let status = resp.status();
157
158 match status {
159 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
160 _ => Err(parse_error(resp)),
161 }
162 }
163
164 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
165 if path == "/" {
167 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
168 }
169
170 let resp = self.core.dbfs_get_status(path).await?;
171
172 let status = resp.status();
173
174 match status {
175 StatusCode::OK => {
176 let mut meta = parse_into_metadata(path, resp.headers())?;
177 let bs = resp.into_body();
178 let decoded_response: DbfsStatus =
179 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
180 meta.set_last_modified(Timestamp::from_millisecond(
181 decoded_response.modification_time,
182 )?);
183 match decoded_response.is_dir {
184 true => meta.set_mode(EntryMode::DIR),
185 false => {
186 meta.set_mode(EntryMode::FILE);
187 meta.set_content_length(decoded_response.file_size as u64)
188 }
189 };
190 Ok(RpStat::new(meta))
191 }
192 StatusCode::NOT_FOUND if path.ends_with('/') => {
193 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
194 }
195 _ => Err(parse_error(resp)),
196 }
197 }
198
199 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
200 Ok((
201 RpWrite::default(),
202 oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
203 ))
204 }
205
206 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207 Ok((
208 RpDelete::default(),
209 oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
210 ))
211 }
212
213 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
214 let l = DbfsLister::new(self.core.clone(), path.to_string());
215
216 Ok((RpList::default(), oio::PageLister::new(l)))
217 }
218
219 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
220 self.core.dbfs_ensure_parent_path(to).await?;
221
222 let resp = self.core.dbfs_rename(from, to).await?;
223
224 let status = resp.status();
225
226 match status {
227 StatusCode::OK => Ok(RpRename::default()),
228 _ => Err(parse_error(resp)),
229 }
230 }
231}
232
233#[derive(Deserialize)]
234struct DbfsStatus {
235 is_dir: bool,
238 file_size: i64,
239 modification_time: i64,
240}