1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::AzureStorageConfig;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28
29use super::core::AzdlsCore;
30use super::core::DIRECTORY;
31use super::delete::AzdlsDeleter;
32use super::error::parse_error;
33use super::lister::AzdlsLister;
34use super::writer::AzdlsWriter;
35use super::writer::AzdlsWriters;
36use crate::raw::*;
37use crate::services::AzdlsConfig;
38use crate::*;
39
40const KNOWN_AZDLS_ENDPOINT_SUFFIX: &[&str] = &[
45 "dfs.core.windows.net",
46 "dfs.core.usgovcloudapi.net",
47 "dfs.core.chinacloudapi.cn",
48];
49
50impl Configurator for AzdlsConfig {
51 type Builder = AzdlsBuilder;
52
53 #[allow(deprecated)]
54 fn into_builder(self) -> Self::Builder {
55 AzdlsBuilder {
56 config: self,
57 http_client: None,
58 }
59 }
60}
61
62#[doc = include_str!("docs.md")]
64#[derive(Default, Clone)]
65pub struct AzdlsBuilder {
66 config: AzdlsConfig,
67
68 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
69 http_client: Option<HttpClient>,
70}
71
72impl Debug for AzdlsBuilder {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 let mut ds = f.debug_struct("AzdlsBuilder");
75
76 ds.field("config", &self.config);
77
78 ds.finish()
79 }
80}
81
82impl AzdlsBuilder {
83 pub fn root(mut self, root: &str) -> Self {
87 self.config.root = if root.is_empty() {
88 None
89 } else {
90 Some(root.to_string())
91 };
92
93 self
94 }
95
96 pub fn filesystem(mut self, filesystem: &str) -> Self {
98 self.config.filesystem = filesystem.to_string();
99
100 self
101 }
102
103 pub fn endpoint(mut self, endpoint: &str) -> Self {
110 if !endpoint.is_empty() {
111 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
113 }
114
115 self
116 }
117
118 pub fn account_name(mut self, account_name: &str) -> Self {
123 if !account_name.is_empty() {
124 self.config.account_name = Some(account_name.to_string());
125 }
126
127 self
128 }
129
130 pub fn account_key(mut self, account_key: &str) -> Self {
135 if !account_key.is_empty() {
136 self.config.account_key = Some(account_key.to_string());
137 }
138
139 self
140 }
141
142 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
149 #[allow(deprecated)]
150 pub fn http_client(mut self, client: HttpClient) -> Self {
151 self.http_client = Some(client);
152 self
153 }
154}
155
156impl Builder for AzdlsBuilder {
157 const SCHEME: Scheme = Scheme::Azdls;
158 type Config = AzdlsConfig;
159
160 fn build(self) -> Result<impl Access> {
161 debug!("backend build started: {:?}", &self);
162
163 let root = normalize_root(&self.config.root.unwrap_or_default());
164 debug!("backend use root {}", root);
165
166 let filesystem = match self.config.filesystem.is_empty() {
168 false => Ok(&self.config.filesystem),
169 true => Err(Error::new(ErrorKind::ConfigInvalid, "filesystem is empty")
170 .with_operation("Builder::build")
171 .with_context("service", Scheme::Azdls)),
172 }?;
173 debug!("backend use filesystem {}", &filesystem);
174
175 let endpoint = match &self.config.endpoint {
176 Some(endpoint) => Ok(endpoint.clone()),
177 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
178 .with_operation("Builder::build")
179 .with_context("service", Scheme::Azdls)),
180 }?;
181 debug!("backend use endpoint {}", &endpoint);
182
183 let config_loader = AzureStorageConfig {
184 account_name: self
185 .config
186 .account_name
187 .clone()
188 .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())),
189 account_key: self.config.account_key.clone(),
190 sas_token: None,
191 ..Default::default()
192 };
193
194 let cred_loader = AzureStorageLoader::new(config_loader);
195 let signer = AzureStorageSigner::new();
196 Ok(AzdlsBackend {
197 core: Arc::new(AzdlsCore {
198 info: {
199 let am = AccessorInfo::default();
200 am.set_scheme(Scheme::Azdls)
201 .set_root(&root)
202 .set_name(filesystem)
203 .set_native_capability(Capability {
204 stat: true,
205 stat_has_cache_control: true,
206 stat_has_content_length: true,
207 stat_has_content_type: true,
208 stat_has_content_encoding: true,
209 stat_has_content_range: true,
210 stat_has_etag: true,
211 stat_has_content_md5: true,
212 stat_has_last_modified: true,
213 stat_has_content_disposition: true,
214
215 read: true,
216
217 write: true,
218 write_can_append: true,
219 write_with_if_none_match: true,
220 write_with_if_not_exists: true,
221
222 create_dir: true,
223 delete: true,
224 rename: true,
225
226 list: true,
227 list_has_etag: true,
228 list_has_content_length: true,
229 list_has_last_modified: true,
230
231 shared: true,
232
233 ..Default::default()
234 });
235
236 #[allow(deprecated)]
238 if let Some(client) = self.http_client {
239 am.update_http_client(|_| client);
240 }
241
242 am.into()
243 },
244 filesystem: self.config.filesystem.clone(),
245 root,
246 endpoint,
247 loader: cred_loader,
248 signer,
249 }),
250 })
251 }
252}
253
254#[derive(Debug, Clone)]
256pub struct AzdlsBackend {
257 core: Arc<AzdlsCore>,
258}
259
260impl Access for AzdlsBackend {
261 type Reader = HttpBody;
262 type Writer = AzdlsWriters;
263 type Lister = oio::PageLister<AzdlsLister>;
264 type Deleter = oio::OneShotDeleter<AzdlsDeleter>;
265 type BlockingReader = ();
266 type BlockingWriter = ();
267 type BlockingLister = ();
268 type BlockingDeleter = ();
269
270 fn info(&self) -> Arc<AccessorInfo> {
271 self.core.info.clone()
272 }
273
274 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
275 let resp = self
276 .core
277 .azdls_create(path, DIRECTORY, &OpWrite::default())
278 .await?;
279
280 let status = resp.status();
281 match status {
282 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
283 _ => Err(parse_error(resp)),
284 }
285 }
286
287 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
288 if path == "/" {
291 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
292 }
293
294 let metadata = self.core.azdls_stat_metadata(path).await?;
295 Ok(RpStat::new(metadata))
296 }
297
298 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
299 let resp = self.core.azdls_read(path, args.range()).await?;
300
301 let status = resp.status();
302 match status {
303 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
304 _ => {
305 let (part, mut body) = resp.into_parts();
306 let buf = body.to_buffer().await?;
307 Err(parse_error(Response::from_parts(part, buf)))
308 }
309 }
310 }
311
312 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
313 let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
314 let w = if args.append() {
315 AzdlsWriters::Two(oio::AppendWriter::new(w))
316 } else {
317 AzdlsWriters::One(oio::OneShotWriter::new(w))
318 };
319 Ok((RpWrite::default(), w))
320 }
321
322 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
323 Ok((
324 RpDelete::default(),
325 oio::OneShotDeleter::new(AzdlsDeleter::new(self.core.clone())),
326 ))
327 }
328
329 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
330 let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
331
332 Ok((RpList::default(), oio::PageLister::new(l)))
333 }
334
335 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
336 if let Some(resp) = self.core.azdls_ensure_parent_path(to).await? {
337 let status = resp.status();
338 match status {
339 StatusCode::CREATED | StatusCode::CONFLICT => {}
340 _ => return Err(parse_error(resp)),
341 }
342 }
343
344 let resp = self.core.azdls_rename(from, to).await?;
345
346 let status = resp.status();
347
348 match status {
349 StatusCode::CREATED => Ok(RpRename::default()),
350 _ => Err(parse_error(resp)),
351 }
352 }
353}
354
355fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
356 let endpoint: &str = endpoint
357 .strip_prefix("http://")
358 .or_else(|| endpoint.strip_prefix("https://"))
359 .unwrap_or(endpoint);
360
361 let mut parts = endpoint.splitn(2, '.');
362 let storage_name = parts.next();
363 let endpoint_suffix = parts
364 .next()
365 .unwrap_or_default()
366 .trim_end_matches('/')
367 .to_lowercase();
368
369 if KNOWN_AZDLS_ENDPOINT_SUFFIX
370 .iter()
371 .any(|s| *s == endpoint_suffix.as_str())
372 {
373 storage_name.map(|s| s.to_string())
374 } else {
375 None
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::infer_storage_name_from_endpoint;
382
383 #[test]
384 fn test_infer_storage_name_from_endpoint() {
385 let endpoint = "https://account.dfs.core.windows.net";
386 let storage_name = infer_storage_name_from_endpoint(endpoint);
387 assert_eq!(storage_name, Some("account".to_string()));
388 }
389
390 #[test]
391 fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
392 let endpoint = "https://account.dfs.core.windows.net/";
393 let storage_name = infer_storage_name_from_endpoint(endpoint);
394 assert_eq!(storage_name, Some("account".to_string()));
395 }
396}