1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use base64::prelude::BASE64_STANDARD;
24use base64::Engine;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28use reqsign::AzureStorageConfig;
29use reqsign::AzureStorageLoader;
30use reqsign::AzureStorageSigner;
31use sha2::Digest;
32use sha2::Sha256;
33
34use super::core::constants::X_MS_META_PREFIX;
35use super::core::constants::X_MS_VERSION_ID;
36use super::core::AzblobCore;
37use super::delete::AzblobDeleter;
38use super::error::parse_error;
39use super::lister::AzblobLister;
40use super::writer::AzblobWriter;
41use super::writer::AzblobWriters;
42use crate::raw::*;
43use crate::services::AzblobConfig;
44use crate::*;
45
46const KNOWN_AZBLOB_ENDPOINT_SUFFIX: &[&str] = &[
51 "blob.core.windows.net",
52 "blob.core.usgovcloudapi.net",
53 "blob.core.chinacloudapi.cn",
54];
55
56const AZBLOB_BATCH_LIMIT: usize = 256;
57
58impl Configurator for AzblobConfig {
59 type Builder = AzblobBuilder;
60
61 #[allow(deprecated)]
62 fn into_builder(self) -> Self::Builder {
63 AzblobBuilder {
64 config: self,
65
66 http_client: None,
67 }
68 }
69}
70
71#[doc = include_str!("docs.md")]
72#[derive(Default, Clone)]
73pub struct AzblobBuilder {
74 config: AzblobConfig,
75
76 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
77 http_client: Option<HttpClient>,
78}
79
80impl Debug for AzblobBuilder {
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 let mut ds = f.debug_struct("AzblobBuilder");
83
84 ds.field("config", &self.config);
85
86 ds.finish()
87 }
88}
89
90impl AzblobBuilder {
91 pub fn root(mut self, root: &str) -> Self {
95 self.config.root = if root.is_empty() {
96 None
97 } else {
98 Some(root.to_string())
99 };
100
101 self
102 }
103
104 pub fn container(mut self, container: &str) -> Self {
106 self.config.container = container.to_string();
107
108 self
109 }
110
111 pub fn endpoint(mut self, endpoint: &str) -> Self {
118 if !endpoint.is_empty() {
119 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
121 }
122
123 self
124 }
125
126 pub fn account_name(mut self, account_name: &str) -> Self {
131 if !account_name.is_empty() {
132 self.config.account_name = Some(account_name.to_string());
133 }
134
135 self
136 }
137
138 pub fn account_key(mut self, account_key: &str) -> Self {
143 if !account_key.is_empty() {
144 self.config.account_key = Some(account_key.to_string());
145 }
146
147 self
148 }
149
150 pub fn encryption_key(mut self, v: &str) -> Self {
163 if !v.is_empty() {
164 self.config.encryption_key = Some(v.to_string());
165 }
166
167 self
168 }
169
170 pub fn encryption_key_sha256(mut self, v: &str) -> Self {
183 if !v.is_empty() {
184 self.config.encryption_key_sha256 = Some(v.to_string());
185 }
186
187 self
188 }
189
190 pub fn encryption_algorithm(mut self, v: &str) -> Self {
203 if !v.is_empty() {
204 self.config.encryption_algorithm = Some(v.to_string());
205 }
206
207 self
208 }
209
210 pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
224 self.config.encryption_algorithm = Some("AES256".to_string());
226 self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
227 self.config.encryption_key_sha256 =
228 Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
229 self
230 }
231
232 pub fn sas_token(mut self, sas_token: &str) -> Self {
240 if !sas_token.is_empty() {
241 self.config.sas_token = Some(sas_token.to_string());
242 }
243
244 self
245 }
246
247 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
254 #[allow(deprecated)]
255 pub fn http_client(mut self, client: HttpClient) -> Self {
256 self.http_client = Some(client);
257 self
258 }
259
260 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
262 self.config.batch_max_operations = Some(batch_max_operations);
263
264 self
265 }
266
267 pub fn from_connection_string(conn: &str) -> Result<Self> {
295 let conn = conn.trim().replace('\n', "");
296
297 let mut conn_map: HashMap<_, _> = HashMap::default();
298 for v in conn.split(';') {
299 let entry: Vec<_> = v.splitn(2, '=').collect();
300 if entry.len() != 2 {
301 continue;
303 }
304 conn_map.insert(entry[0], entry[1]);
305 }
306
307 let mut builder = AzblobBuilder::default();
308
309 if let Some(sas_token) = conn_map.get("SharedAccessSignature") {
310 builder = builder.sas_token(sas_token);
311 } else {
312 let account_name = conn_map.get("AccountName").ok_or_else(|| {
313 Error::new(
314 ErrorKind::ConfigInvalid,
315 "connection string must have AccountName",
316 )
317 .with_operation("Builder::from_connection_string")
318 })?;
319 builder = builder.account_name(account_name);
320 let account_key = conn_map.get("AccountKey").ok_or_else(|| {
321 Error::new(
322 ErrorKind::ConfigInvalid,
323 "connection string must have AccountKey",
324 )
325 .with_operation("Builder::from_connection_string")
326 })?;
327 builder = builder.account_key(account_key);
328 }
329
330 if let Some(v) = conn_map.get("BlobEndpoint") {
331 builder = builder.endpoint(v);
332 } else if let Some(v) = conn_map.get("EndpointSuffix") {
333 let protocol = conn_map.get("DefaultEndpointsProtocol").unwrap_or(&"https");
334 let account_name = builder
335 .config
336 .account_name
337 .as_ref()
338 .ok_or_else(|| {
339 Error::new(
340 ErrorKind::ConfigInvalid,
341 "connection string must have AccountName",
342 )
343 .with_operation("Builder::from_connection_string")
344 })?
345 .clone();
346 builder = builder.endpoint(&format!("{protocol}://{account_name}.blob.{v}"));
347 }
348
349 Ok(builder)
350 }
351}
352
353impl Builder for AzblobBuilder {
354 const SCHEME: Scheme = Scheme::Azblob;
355 type Config = AzblobConfig;
356
357 fn build(self) -> Result<impl Access> {
358 debug!("backend build started: {:?}", &self);
359
360 let root = normalize_root(&self.config.root.unwrap_or_default());
361 debug!("backend use root {}", root);
362
363 let container = match self.config.container.is_empty() {
365 false => Ok(&self.config.container),
366 true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
367 .with_operation("Builder::build")
368 .with_context("service", Scheme::Azblob)),
369 }?;
370 debug!("backend use container {}", &container);
371
372 let endpoint = match &self.config.endpoint {
373 Some(endpoint) => Ok(endpoint.clone()),
374 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
375 .with_operation("Builder::build")
376 .with_context("service", Scheme::Azblob)),
377 }?;
378 debug!("backend use endpoint {}", &container);
379
380 let mut config_loader = AzureStorageConfig::default().from_env();
381
382 if let Some(v) = self
383 .config
384 .account_name
385 .clone()
386 .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str()))
387 {
388 config_loader.account_name = Some(v);
389 }
390
391 if let Some(v) = self.config.account_key.clone() {
392 config_loader.account_key = Some(v);
393 }
394
395 if let Some(v) = self.config.sas_token.clone() {
396 config_loader.sas_token = Some(v);
397 }
398
399 let encryption_key =
400 match &self.config.encryption_key {
401 None => None,
402 Some(v) => Some(build_header_value(v).map_err(|err| {
403 err.with_context("key", "server_side_encryption_customer_key")
404 })?),
405 };
406
407 let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
408 None => None,
409 Some(v) => Some(build_header_value(v).map_err(|err| {
410 err.with_context("key", "server_side_encryption_customer_key_sha256")
411 })?),
412 };
413
414 let encryption_algorithm = match &self.config.encryption_algorithm {
415 None => None,
416 Some(v) => {
417 if v == "AES256" {
418 Some(build_header_value(v).map_err(|err| {
419 err.with_context("key", "server_side_encryption_customer_algorithm")
420 })?)
421 } else {
422 return Err(Error::new(
423 ErrorKind::ConfigInvalid,
424 "encryption_algorithm value must be AES256",
425 ));
426 }
427 }
428 };
429
430 let cred_loader = AzureStorageLoader::new(config_loader);
431
432 let signer = AzureStorageSigner::new();
433
434 Ok(AzblobBackend {
435 core: Arc::new(AzblobCore {
436 info: {
437 let am = AccessorInfo::default();
438 am.set_scheme(Scheme::Azblob)
439 .set_root(&root)
440 .set_name(container)
441 .set_native_capability(Capability {
442 stat: true,
443 stat_with_if_match: true,
444 stat_with_if_none_match: true,
445 stat_has_cache_control: true,
446 stat_has_content_length: true,
447 stat_has_content_type: true,
448 stat_has_content_encoding: true,
449 stat_has_content_range: true,
450 stat_has_etag: true,
451 stat_has_content_md5: true,
452 stat_has_last_modified: true,
453 stat_has_content_disposition: true,
454
455 read: true,
456
457 read_with_if_match: true,
458 read_with_if_none_match: true,
459 read_with_override_content_disposition: true,
460 read_with_if_modified_since: true,
461 read_with_if_unmodified_since: true,
462
463 write: true,
464 write_can_append: true,
465 write_can_empty: true,
466 write_can_multi: true,
467 write_with_cache_control: true,
468 write_with_content_type: true,
469 write_with_if_not_exists: true,
470 write_with_if_none_match: true,
471 write_with_user_metadata: true,
472
473 delete: true,
474 delete_max_size: Some(AZBLOB_BATCH_LIMIT),
475
476 copy: true,
477
478 list: true,
479 list_with_recursive: true,
480 list_has_etag: true,
481 list_has_content_length: true,
482 list_has_content_md5: true,
483 list_has_content_type: true,
484 list_has_last_modified: true,
485
486 presign: self.config.sas_token.is_some(),
487 presign_stat: self.config.sas_token.is_some(),
488 presign_read: self.config.sas_token.is_some(),
489 presign_write: self.config.sas_token.is_some(),
490
491 shared: true,
492
493 ..Default::default()
494 });
495
496 #[allow(deprecated)]
498 if let Some(client) = self.http_client {
499 am.update_http_client(|_| client);
500 }
501
502 am.into()
503 },
504 root,
505 endpoint,
506 encryption_key,
507 encryption_key_sha256,
508 encryption_algorithm,
509 container: self.config.container.clone(),
510
511 loader: cred_loader,
512 signer,
513 }),
514 })
515 }
516}
517
518fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
519 let endpoint: &str = endpoint
520 .strip_prefix("http://")
521 .or_else(|| endpoint.strip_prefix("https://"))
522 .unwrap_or(endpoint);
523
524 let mut parts = endpoint.splitn(2, '.');
525 let storage_name = parts.next();
526 let endpoint_suffix = parts
527 .next()
528 .unwrap_or_default()
529 .trim_end_matches('/')
530 .to_lowercase();
531
532 if KNOWN_AZBLOB_ENDPOINT_SUFFIX
533 .iter()
534 .any(|s| *s == endpoint_suffix.as_str())
535 {
536 storage_name.map(|s| s.to_string())
537 } else {
538 None
539 }
540}
541
542#[derive(Debug, Clone)]
544pub struct AzblobBackend {
545 core: Arc<AzblobCore>,
546}
547
548impl Access for AzblobBackend {
549 type Reader = HttpBody;
550 type Writer = AzblobWriters;
551 type Lister = oio::PageLister<AzblobLister>;
552 type Deleter = oio::BatchDeleter<AzblobDeleter>;
553 type BlockingReader = ();
554 type BlockingWriter = ();
555 type BlockingLister = ();
556 type BlockingDeleter = ();
557
558 fn info(&self) -> Arc<AccessorInfo> {
559 self.core.info.clone()
560 }
561
562 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
563 let resp = self.core.azblob_get_blob_properties(path, &args).await?;
564
565 let status = resp.status();
566
567 match status {
568 StatusCode::OK => {
569 let headers = resp.headers();
570 let mut meta = parse_into_metadata(path, headers)?;
571 if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
572 meta.set_version(version_id);
573 }
574
575 let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
576 if !user_meta.is_empty() {
577 meta.with_user_metadata(user_meta);
578 }
579
580 Ok(RpStat::new(meta))
581 }
582 _ => Err(parse_error(resp)),
583 }
584 }
585
586 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
587 let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
588
589 let status = resp.status();
590 match status {
591 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
592 _ => {
593 let (part, mut body) = resp.into_parts();
594 let buf = body.to_buffer().await?;
595 Err(parse_error(Response::from_parts(part, buf)))
596 }
597 }
598 }
599
600 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
601 let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
602 let w = if args.append() {
603 AzblobWriters::Two(oio::AppendWriter::new(w))
604 } else {
605 AzblobWriters::One(oio::BlockWriter::new(
606 self.core.info.clone(),
607 w,
608 args.concurrent(),
609 ))
610 };
611
612 Ok((RpWrite::default(), w))
613 }
614
615 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
616 Ok((
617 RpDelete::default(),
618 oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
619 ))
620 }
621
622 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
623 let l = AzblobLister::new(
624 self.core.clone(),
625 path.to_string(),
626 args.recursive(),
627 args.limit(),
628 );
629
630 Ok((RpList::default(), oio::PageLister::new(l)))
631 }
632
633 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
634 let resp = self.core.azblob_copy_blob(from, to).await?;
635
636 let status = resp.status();
637
638 match status {
639 StatusCode::ACCEPTED => Ok(RpCopy::default()),
640 _ => Err(parse_error(resp)),
641 }
642 }
643
644 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
645 let req = match args.operation() {
646 PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
647 PresignOperation::Read(v) => {
648 self.core
649 .azblob_get_blob_request(path, BytesRange::default(), v)
650 }
651 PresignOperation::Write(_) => {
652 self.core
653 .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
654 }
655 PresignOperation::Delete(_) => Err(Error::new(
656 ErrorKind::Unsupported,
657 "operation is not supported",
658 )),
659 };
660
661 let mut req = req?;
662
663 self.core.sign_query(&mut req).await?;
664
665 let (parts, _) = req.into_parts();
666
667 Ok(RpPresign::new(PresignedRequest::new(
668 parts.method,
669 parts.uri,
670 parts.headers,
671 )))
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::infer_storage_name_from_endpoint;
678 use super::AzblobBuilder;
679
680 #[test]
681 fn test_infer_storage_name_from_endpoint() {
682 let endpoint = "https://account.blob.core.windows.net";
683 let storage_name = infer_storage_name_from_endpoint(endpoint);
684 assert_eq!(storage_name, Some("account".to_string()));
685 }
686
687 #[test]
688 fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
689 let endpoint = "https://account.blob.core.windows.net/";
690 let storage_name = infer_storage_name_from_endpoint(endpoint);
691 assert_eq!(storage_name, Some("account".to_string()));
692 }
693
694 #[test]
695 fn test_builder_from_connection_string() {
696 let builder = AzblobBuilder::from_connection_string(
697 r#"
698DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
699AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;
700BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
701QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
702TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
703 "#,
704 )
705 .expect("from connection string must succeed");
706
707 assert_eq!(
708 builder.config.endpoint.unwrap(),
709 "http://127.0.0.1:10000/devstoreaccount1"
710 );
711 assert_eq!(builder.config.account_name.unwrap(), "devstoreaccount1");
712 assert_eq!(builder.config.account_key.unwrap(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==");
713
714 let builder = AzblobBuilder::from_connection_string(
715 r#"
716DefaultEndpointsProtocol=https;
717AccountName=storagesample;
718AccountKey=account-key;
719EndpointSuffix=core.chinacloudapi.cn;
720 "#,
721 )
722 .expect("from connection string must succeed");
723
724 assert_eq!(
725 builder.config.endpoint.unwrap(),
726 "https://storagesample.blob.core.chinacloudapi.cn"
727 );
728 assert_eq!(builder.config.account_name.unwrap(), "storagesample");
729 assert_eq!(builder.config.account_key.unwrap(), "account-key")
730 }
731
732 #[test]
733 fn test_sas_from_connection_string() {
734 let builder = AzblobBuilder::from_connection_string(
736 r#"
737BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
738QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
739TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
740SharedAccessSignature=sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D
741 "#,
742 )
743 .expect("from connection string must succeed");
744
745 assert_eq!(
746 builder.config.endpoint.unwrap(),
747 "http://127.0.0.1:10000/devstoreaccount1"
748 );
749 assert_eq!(builder.config.sas_token.unwrap(), "sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D");
750 assert_eq!(builder.config.account_name, None);
751 assert_eq!(builder.config.account_key, None);
752 }
753
754 #[test]
755 pub fn test_sas_preferred() {
756 let builder = AzblobBuilder::from_connection_string(
757 r#"
758BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
759AccountName=storagesample;
760AccountKey=account-key;
761SharedAccessSignature=sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D
762 "#,
763 )
764 .expect("from connection string must succeed");
765
766 assert_eq!(builder.config.sas_token.unwrap(), "sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D");
768 assert_eq!(builder.config.account_name, None);
769 assert_eq!(builder.config.account_key, None);
770 }
771}