1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::AzureStorageConfig;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28
29use super::core::AzdlsCore;
30use super::delete::AzdlsDeleter;
31use super::error::parse_error;
32use super::lister::AzdlsLister;
33use super::writer::AzdlsWriter;
34use super::writer::AzdlsWriters;
35use crate::raw::*;
36use crate::services::AzdlsConfig;
37use crate::*;
38
39const KNOWN_AZDLS_ENDPOINT_SUFFIX: &[&str] = &[
44 "dfs.core.windows.net",
45 "dfs.core.usgovcloudapi.net",
46 "dfs.core.chinacloudapi.cn",
47];
48
49impl Configurator for AzdlsConfig {
50 type Builder = AzdlsBuilder;
51
52 #[allow(deprecated)]
53 fn into_builder(self) -> Self::Builder {
54 AzdlsBuilder {
55 config: self,
56 http_client: None,
57 }
58 }
59}
60
61#[doc = include_str!("docs.md")]
63#[derive(Default, Clone)]
64pub struct AzdlsBuilder {
65 config: AzdlsConfig,
66
67 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
68 http_client: Option<HttpClient>,
69}
70
71impl Debug for AzdlsBuilder {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 let mut ds = f.debug_struct("AzdlsBuilder");
74
75 ds.field("config", &self.config);
76
77 ds.finish()
78 }
79}
80
81impl AzdlsBuilder {
82 pub fn root(mut self, root: &str) -> Self {
86 self.config.root = if root.is_empty() {
87 None
88 } else {
89 Some(root.to_string())
90 };
91
92 self
93 }
94
95 pub fn filesystem(mut self, filesystem: &str) -> Self {
97 self.config.filesystem = filesystem.to_string();
98
99 self
100 }
101
102 pub fn endpoint(mut self, endpoint: &str) -> Self {
109 if !endpoint.is_empty() {
110 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
112 }
113
114 self
115 }
116
117 pub fn account_name(mut self, account_name: &str) -> Self {
122 if !account_name.is_empty() {
123 self.config.account_name = Some(account_name.to_string());
124 }
125
126 self
127 }
128
129 pub fn account_key(mut self, account_key: &str) -> Self {
134 if !account_key.is_empty() {
135 self.config.account_key = Some(account_key.to_string());
136 }
137
138 self
139 }
140
141 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
148 #[allow(deprecated)]
149 pub fn http_client(mut self, client: HttpClient) -> Self {
150 self.http_client = Some(client);
151 self
152 }
153}
154
155impl Builder for AzdlsBuilder {
156 const SCHEME: Scheme = Scheme::Azdls;
157 type Config = AzdlsConfig;
158
159 fn build(self) -> Result<impl Access> {
160 debug!("backend build started: {:?}", &self);
161
162 let root = normalize_root(&self.config.root.unwrap_or_default());
163 debug!("backend use root {}", root);
164
165 let filesystem = match self.config.filesystem.is_empty() {
167 false => Ok(&self.config.filesystem),
168 true => Err(Error::new(ErrorKind::ConfigInvalid, "filesystem is empty")
169 .with_operation("Builder::build")
170 .with_context("service", Scheme::Azdls)),
171 }?;
172 debug!("backend use filesystem {}", &filesystem);
173
174 let endpoint = match &self.config.endpoint {
175 Some(endpoint) => Ok(endpoint.clone()),
176 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
177 .with_operation("Builder::build")
178 .with_context("service", Scheme::Azdls)),
179 }?;
180 debug!("backend use endpoint {}", &endpoint);
181
182 let config_loader = AzureStorageConfig {
183 account_name: self
184 .config
185 .account_name
186 .clone()
187 .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())),
188 account_key: self.config.account_key.clone(),
189 sas_token: None,
190 ..Default::default()
191 };
192
193 let cred_loader = AzureStorageLoader::new(config_loader);
194 let signer = AzureStorageSigner::new();
195 Ok(AzdlsBackend {
196 core: Arc::new(AzdlsCore {
197 info: {
198 let am = AccessorInfo::default();
199 am.set_scheme(Scheme::Azdls)
200 .set_root(&root)
201 .set_name(filesystem)
202 .set_native_capability(Capability {
203 stat: true,
204 stat_has_cache_control: true,
205 stat_has_content_length: true,
206 stat_has_content_type: true,
207 stat_has_content_encoding: true,
208 stat_has_content_range: true,
209 stat_has_etag: true,
210 stat_has_content_md5: true,
211 stat_has_last_modified: true,
212 stat_has_content_disposition: true,
213
214 read: true,
215
216 write: true,
217 write_can_append: true,
218 write_with_if_none_match: true,
219 write_with_if_not_exists: true,
220
221 create_dir: true,
222 delete: true,
223 rename: true,
224
225 list: true,
226 list_has_etag: true,
227 list_has_content_length: true,
228 list_has_last_modified: true,
229
230 shared: true,
231
232 ..Default::default()
233 });
234
235 #[allow(deprecated)]
237 if let Some(client) = self.http_client {
238 am.update_http_client(|_| client);
239 }
240
241 am.into()
242 },
243 filesystem: self.config.filesystem.clone(),
244 root,
245 endpoint,
246 loader: cred_loader,
247 signer,
248 }),
249 })
250 }
251}
252
253#[derive(Debug, Clone)]
255pub struct AzdlsBackend {
256 core: Arc<AzdlsCore>,
257}
258
259impl Access for AzdlsBackend {
260 type Reader = HttpBody;
261 type Writer = AzdlsWriters;
262 type Lister = oio::PageLister<AzdlsLister>;
263 type Deleter = oio::OneShotDeleter<AzdlsDeleter>;
264 type BlockingReader = ();
265 type BlockingWriter = ();
266 type BlockingLister = ();
267 type BlockingDeleter = ();
268
269 fn info(&self) -> Arc<AccessorInfo> {
270 self.core.info.clone()
271 }
272
273 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
274 let mut req = self.core.azdls_create_request(
275 path,
276 "directory",
277 &OpWrite::default(),
278 Buffer::new(),
279 )?;
280
281 self.core.sign(&mut req).await?;
282
283 let resp = self.core.send(req).await?;
284
285 let status = resp.status();
286
287 match status {
288 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
289 _ => Err(parse_error(resp)),
290 }
291 }
292
293 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
294 if path == "/" {
296 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
297 }
298
299 let resp = self.core.azdls_get_properties(path).await?;
300
301 if resp.status() != StatusCode::OK {
302 return Err(parse_error(resp));
303 }
304
305 let mut meta = parse_into_metadata(path, resp.headers())?;
306 let resource = resp
307 .headers()
308 .get("x-ms-resource-type")
309 .ok_or_else(|| {
310 Error::new(
311 ErrorKind::Unexpected,
312 "azdls should return x-ms-resource-type header, but it's missing",
313 )
314 })?
315 .to_str()
316 .map_err(|err| {
317 Error::new(
318 ErrorKind::Unexpected,
319 "azdls should return x-ms-resource-type header, but it's not a valid string",
320 )
321 .set_source(err)
322 })?;
323
324 meta = match resource {
325 "file" => meta.with_mode(EntryMode::FILE),
326 "directory" => meta.with_mode(EntryMode::DIR),
327 v => {
328 return Err(Error::new(
329 ErrorKind::Unexpected,
330 "azdls returns not supported x-ms-resource-type",
331 )
332 .with_context("resource", v))
333 }
334 };
335
336 Ok(RpStat::new(meta))
337 }
338
339 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
340 let resp = self.core.azdls_read(path, args.range()).await?;
341
342 let status = resp.status();
343 match status {
344 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
345 _ => {
346 let (part, mut body) = resp.into_parts();
347 let buf = body.to_buffer().await?;
348 Err(parse_error(Response::from_parts(part, buf)))
349 }
350 }
351 }
352
353 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
354 let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
355 let w = if args.append() {
356 AzdlsWriters::Two(oio::AppendWriter::new(w))
357 } else {
358 AzdlsWriters::One(oio::OneShotWriter::new(w))
359 };
360 Ok((RpWrite::default(), w))
361 }
362
363 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
364 Ok((
365 RpDelete::default(),
366 oio::OneShotDeleter::new(AzdlsDeleter::new(self.core.clone())),
367 ))
368 }
369
370 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
371 let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
372
373 Ok((RpList::default(), oio::PageLister::new(l)))
374 }
375
376 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
377 if let Some(resp) = self.core.azdls_ensure_parent_path(to).await? {
378 let status = resp.status();
379 match status {
380 StatusCode::CREATED | StatusCode::CONFLICT => {}
381 _ => return Err(parse_error(resp)),
382 }
383 }
384
385 let resp = self.core.azdls_rename(from, to).await?;
386
387 let status = resp.status();
388
389 match status {
390 StatusCode::CREATED => Ok(RpRename::default()),
391 _ => Err(parse_error(resp)),
392 }
393 }
394}
395
396fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
397 let endpoint: &str = endpoint
398 .strip_prefix("http://")
399 .or_else(|| endpoint.strip_prefix("https://"))
400 .unwrap_or(endpoint);
401
402 let mut parts = endpoint.splitn(2, '.');
403 let storage_name = parts.next();
404 let endpoint_suffix = parts
405 .next()
406 .unwrap_or_default()
407 .trim_end_matches('/')
408 .to_lowercase();
409
410 if KNOWN_AZDLS_ENDPOINT_SUFFIX
411 .iter()
412 .any(|s| *s == endpoint_suffix.as_str())
413 {
414 storage_name.map(|s| s.to_string())
415 } else {
416 None
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::infer_storage_name_from_endpoint;
423
424 #[test]
425 fn test_infer_storage_name_from_endpoint() {
426 let endpoint = "https://account.dfs.core.windows.net";
427 let storage_name = infer_storage_name_from_endpoint(endpoint);
428 assert_eq!(storage_name, Some("account".to_string()));
429 }
430
431 #[test]
432 fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
433 let endpoint = "https://account.dfs.core.windows.net/";
434 let storage_name = infer_storage_name_from_endpoint(endpoint);
435 assert_eq!(storage_name, Some("account".to_string()));
436 }
437}