1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use base64::prelude::BASE64_STANDARD;
24use base64::Engine;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28use reqsign::AzureStorageConfig;
29use reqsign::AzureStorageLoader;
30use reqsign::AzureStorageSigner;
31use sha2::Digest;
32use sha2::Sha256;
33
34use super::core::constants::X_MS_META_PREFIX;
35use super::core::constants::X_MS_VERSION_ID;
36use super::core::AzblobCore;
37use super::delete::AzblobDeleter;
38use super::error::parse_error;
39use super::lister::AzblobLister;
40use super::writer::AzblobWriter;
41use super::writer::AzblobWriters;
42use crate::raw::*;
43use crate::services::AzblobConfig;
44use crate::*;
45
46const KNOWN_AZBLOB_ENDPOINT_SUFFIX: &[&str] = &[
51 "blob.core.windows.net",
52 "blob.core.usgovcloudapi.net",
53 "blob.core.chinacloudapi.cn",
54];
55
56const AZBLOB_BATCH_LIMIT: usize = 256;
57
58impl Configurator for AzblobConfig {
59 type Builder = AzblobBuilder;
60
61 #[allow(deprecated)]
62 fn into_builder(self) -> Self::Builder {
63 AzblobBuilder {
64 config: self,
65
66 http_client: None,
67 }
68 }
69}
70
71#[doc = include_str!("docs.md")]
72#[derive(Default, Clone)]
73pub struct AzblobBuilder {
74 config: AzblobConfig,
75
76 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
77 http_client: Option<HttpClient>,
78}
79
80impl Debug for AzblobBuilder {
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 let mut ds = f.debug_struct("AzblobBuilder");
83
84 ds.field("config", &self.config);
85
86 ds.finish()
87 }
88}
89
90impl AzblobBuilder {
91 pub fn root(mut self, root: &str) -> Self {
95 self.config.root = if root.is_empty() {
96 None
97 } else {
98 Some(root.to_string())
99 };
100
101 self
102 }
103
104 pub fn container(mut self, container: &str) -> Self {
106 self.config.container = container.to_string();
107
108 self
109 }
110
111 pub fn endpoint(mut self, endpoint: &str) -> Self {
118 if !endpoint.is_empty() {
119 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
121 }
122
123 self
124 }
125
126 pub fn account_name(mut self, account_name: &str) -> Self {
131 if !account_name.is_empty() {
132 self.config.account_name = Some(account_name.to_string());
133 }
134
135 self
136 }
137
138 pub fn account_key(mut self, account_key: &str) -> Self {
143 if !account_key.is_empty() {
144 self.config.account_key = Some(account_key.to_string());
145 }
146
147 self
148 }
149
150 pub fn encryption_key(mut self, v: &str) -> Self {
163 if !v.is_empty() {
164 self.config.encryption_key = Some(v.to_string());
165 }
166
167 self
168 }
169
170 pub fn encryption_key_sha256(mut self, v: &str) -> Self {
183 if !v.is_empty() {
184 self.config.encryption_key_sha256 = Some(v.to_string());
185 }
186
187 self
188 }
189
190 pub fn encryption_algorithm(mut self, v: &str) -> Self {
203 if !v.is_empty() {
204 self.config.encryption_algorithm = Some(v.to_string());
205 }
206
207 self
208 }
209
210 pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
224 self.config.encryption_algorithm = Some("AES256".to_string());
226 self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
227 self.config.encryption_key_sha256 =
228 Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
229 self
230 }
231
232 pub fn sas_token(mut self, sas_token: &str) -> Self {
240 if !sas_token.is_empty() {
241 self.config.sas_token = Some(sas_token.to_string());
242 }
243
244 self
245 }
246
247 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
254 #[allow(deprecated)]
255 pub fn http_client(mut self, client: HttpClient) -> Self {
256 self.http_client = Some(client);
257 self
258 }
259
260 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
262 self.config.batch_max_operations = Some(batch_max_operations);
263
264 self
265 }
266
267 pub fn from_connection_string(conn: &str) -> Result<Self> {
295 let conn = conn.trim().replace('\n', "");
296
297 let mut conn_map: HashMap<_, _> = HashMap::default();
298 for v in conn.split(';') {
299 let entry: Vec<_> = v.splitn(2, '=').collect();
300 if entry.len() != 2 {
301 continue;
303 }
304 conn_map.insert(entry[0], entry[1]);
305 }
306
307 let mut builder = AzblobBuilder::default();
308
309 if let Some(sas_token) = conn_map.get("SharedAccessSignature") {
310 builder = builder.sas_token(sas_token);
311 } else {
312 let account_name = conn_map.get("AccountName").ok_or_else(|| {
313 Error::new(
314 ErrorKind::ConfigInvalid,
315 "connection string must have AccountName",
316 )
317 .with_operation("Builder::from_connection_string")
318 })?;
319 builder = builder.account_name(account_name);
320 let account_key = conn_map.get("AccountKey").ok_or_else(|| {
321 Error::new(
322 ErrorKind::ConfigInvalid,
323 "connection string must have AccountKey",
324 )
325 .with_operation("Builder::from_connection_string")
326 })?;
327 builder = builder.account_key(account_key);
328 }
329
330 if let Some(v) = conn_map.get("BlobEndpoint") {
331 builder = builder.endpoint(v);
332 } else if let Some(v) = conn_map.get("EndpointSuffix") {
333 let protocol = conn_map.get("DefaultEndpointsProtocol").unwrap_or(&"https");
334 let account_name = builder
335 .config
336 .account_name
337 .as_ref()
338 .ok_or_else(|| {
339 Error::new(
340 ErrorKind::ConfigInvalid,
341 "connection string must have AccountName",
342 )
343 .with_operation("Builder::from_connection_string")
344 })?
345 .clone();
346 builder = builder.endpoint(&format!("{protocol}://{account_name}.blob.{v}"));
347 }
348
349 Ok(builder)
350 }
351}
352
353impl Builder for AzblobBuilder {
354 const SCHEME: Scheme = Scheme::Azblob;
355 type Config = AzblobConfig;
356
357 fn build(self) -> Result<impl Access> {
358 debug!("backend build started: {:?}", &self);
359
360 let root = normalize_root(&self.config.root.unwrap_or_default());
361 debug!("backend use root {}", root);
362
363 let container = match self.config.container.is_empty() {
365 false => Ok(&self.config.container),
366 true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
367 .with_operation("Builder::build")
368 .with_context("service", Scheme::Azblob)),
369 }?;
370 debug!("backend use container {}", &container);
371
372 let endpoint = match &self.config.endpoint {
373 Some(endpoint) => Ok(endpoint.clone()),
374 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
375 .with_operation("Builder::build")
376 .with_context("service", Scheme::Azblob)),
377 }?;
378 debug!("backend use endpoint {}", &container);
379
380 let mut config_loader = AzureStorageConfig::default().from_env();
381
382 if let Some(v) = self
383 .config
384 .account_name
385 .clone()
386 .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str()))
387 {
388 config_loader.account_name = Some(v);
389 }
390
391 if let Some(v) = self.config.account_key.clone() {
392 config_loader.account_key = Some(v);
393 }
394
395 if let Some(v) = self.config.sas_token.clone() {
396 config_loader.sas_token = Some(v);
397 }
398
399 let encryption_key =
400 match &self.config.encryption_key {
401 None => None,
402 Some(v) => Some(build_header_value(v).map_err(|err| {
403 err.with_context("key", "server_side_encryption_customer_key")
404 })?),
405 };
406
407 let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
408 None => None,
409 Some(v) => Some(build_header_value(v).map_err(|err| {
410 err.with_context("key", "server_side_encryption_customer_key_sha256")
411 })?),
412 };
413
414 let encryption_algorithm = match &self.config.encryption_algorithm {
415 None => None,
416 Some(v) => {
417 if v == "AES256" {
418 Some(build_header_value(v).map_err(|err| {
419 err.with_context("key", "server_side_encryption_customer_algorithm")
420 })?)
421 } else {
422 return Err(Error::new(
423 ErrorKind::ConfigInvalid,
424 "encryption_algorithm value must be AES256",
425 ));
426 }
427 }
428 };
429
430 let cred_loader = AzureStorageLoader::new(config_loader);
431
432 let signer = AzureStorageSigner::new();
433
434 Ok(AzblobBackend {
435 core: Arc::new(AzblobCore {
436 info: {
437 let am = AccessorInfo::default();
438 am.set_scheme(Scheme::Azblob)
439 .set_root(&root)
440 .set_name(container)
441 .set_native_capability(Capability {
442 stat: true,
443 stat_with_if_match: true,
444 stat_with_if_none_match: true,
445 stat_has_cache_control: true,
446 stat_has_content_length: true,
447 stat_has_content_type: true,
448 stat_has_content_encoding: true,
449 stat_has_content_range: true,
450 stat_has_etag: true,
451 stat_has_content_md5: true,
452 stat_has_last_modified: true,
453 stat_has_content_disposition: true,
454
455 read: true,
456
457 read_with_if_match: true,
458 read_with_if_none_match: true,
459 read_with_override_content_disposition: true,
460 read_with_if_modified_since: true,
461 read_with_if_unmodified_since: true,
462
463 write: true,
464 write_can_append: true,
465 write_can_empty: true,
466 write_can_multi: true,
467 write_with_cache_control: true,
468 write_with_content_type: true,
469 write_with_if_not_exists: true,
470 write_with_if_none_match: true,
471 write_with_user_metadata: true,
472
473 delete: true,
474 delete_max_size: Some(AZBLOB_BATCH_LIMIT),
475
476 copy: true,
477
478 list: true,
479 list_with_recursive: true,
480 list_has_etag: true,
481 list_has_content_length: true,
482 list_has_content_md5: true,
483 list_has_content_type: true,
484 list_has_last_modified: true,
485
486 presign: self.config.sas_token.is_some(),
487 presign_stat: self.config.sas_token.is_some(),
488 presign_read: self.config.sas_token.is_some(),
489 presign_write: self.config.sas_token.is_some(),
490
491 shared: true,
492
493 ..Default::default()
494 });
495
496 #[allow(deprecated)]
498 if let Some(client) = self.http_client {
499 am.update_http_client(|_| client);
500 }
501
502 am.into()
503 },
504 root,
505 endpoint,
506 encryption_key,
507 encryption_key_sha256,
508 encryption_algorithm,
509 container: self.config.container.clone(),
510
511 loader: cred_loader,
512 signer,
513 }),
514 })
515 }
516}
517
518fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
519 let endpoint: &str = endpoint
520 .strip_prefix("http://")
521 .or_else(|| endpoint.strip_prefix("https://"))
522 .unwrap_or(endpoint);
523
524 let mut parts = endpoint.splitn(2, '.');
525 let storage_name = parts.next();
526 let endpoint_suffix = parts
527 .next()
528 .unwrap_or_default()
529 .trim_end_matches('/')
530 .to_lowercase();
531
532 if KNOWN_AZBLOB_ENDPOINT_SUFFIX.contains(&endpoint_suffix.as_str()) {
533 storage_name.map(|s| s.to_string())
534 } else {
535 None
536 }
537}
538
539#[derive(Debug, Clone)]
541pub struct AzblobBackend {
542 core: Arc<AzblobCore>,
543}
544
545impl Access for AzblobBackend {
546 type Reader = HttpBody;
547 type Writer = AzblobWriters;
548 type Lister = oio::PageLister<AzblobLister>;
549 type Deleter = oio::BatchDeleter<AzblobDeleter>;
550 type BlockingReader = ();
551 type BlockingWriter = ();
552 type BlockingLister = ();
553 type BlockingDeleter = ();
554
555 fn info(&self) -> Arc<AccessorInfo> {
556 self.core.info.clone()
557 }
558
559 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
560 let resp = self.core.azblob_get_blob_properties(path, &args).await?;
561
562 let status = resp.status();
563
564 match status {
565 StatusCode::OK => {
566 let headers = resp.headers();
567 let mut meta = parse_into_metadata(path, headers)?;
568 if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
569 meta.set_version(version_id);
570 }
571
572 let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
573 if !user_meta.is_empty() {
574 meta.with_user_metadata(user_meta);
575 }
576
577 Ok(RpStat::new(meta))
578 }
579 _ => Err(parse_error(resp)),
580 }
581 }
582
583 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
584 let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
585
586 let status = resp.status();
587 match status {
588 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
589 _ => {
590 let (part, mut body) = resp.into_parts();
591 let buf = body.to_buffer().await?;
592 Err(parse_error(Response::from_parts(part, buf)))
593 }
594 }
595 }
596
597 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
598 let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
599 let w = if args.append() {
600 AzblobWriters::Two(oio::AppendWriter::new(w))
601 } else {
602 AzblobWriters::One(oio::BlockWriter::new(
603 self.core.info.clone(),
604 w,
605 args.concurrent(),
606 ))
607 };
608
609 Ok((RpWrite::default(), w))
610 }
611
612 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
613 Ok((
614 RpDelete::default(),
615 oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
616 ))
617 }
618
619 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
620 let l = AzblobLister::new(
621 self.core.clone(),
622 path.to_string(),
623 args.recursive(),
624 args.limit(),
625 );
626
627 Ok((RpList::default(), oio::PageLister::new(l)))
628 }
629
630 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
631 let resp = self.core.azblob_copy_blob(from, to).await?;
632
633 let status = resp.status();
634
635 match status {
636 StatusCode::ACCEPTED => Ok(RpCopy::default()),
637 _ => Err(parse_error(resp)),
638 }
639 }
640
641 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
642 let req = match args.operation() {
643 PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
644 PresignOperation::Read(v) => {
645 self.core
646 .azblob_get_blob_request(path, BytesRange::default(), v)
647 }
648 PresignOperation::Write(_) => {
649 self.core
650 .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
651 }
652 PresignOperation::Delete(_) => Err(Error::new(
653 ErrorKind::Unsupported,
654 "operation is not supported",
655 )),
656 };
657
658 let mut req = req?;
659
660 self.core.sign_query(&mut req).await?;
661
662 let (parts, _) = req.into_parts();
663
664 Ok(RpPresign::new(PresignedRequest::new(
665 parts.method,
666 parts.uri,
667 parts.headers,
668 )))
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use super::infer_storage_name_from_endpoint;
675 use super::AzblobBuilder;
676
677 #[test]
678 fn test_infer_storage_name_from_endpoint() {
679 let endpoint = "https://account.blob.core.windows.net";
680 let storage_name = infer_storage_name_from_endpoint(endpoint);
681 assert_eq!(storage_name, Some("account".to_string()));
682 }
683
684 #[test]
685 fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
686 let endpoint = "https://account.blob.core.windows.net/";
687 let storage_name = infer_storage_name_from_endpoint(endpoint);
688 assert_eq!(storage_name, Some("account".to_string()));
689 }
690
691 #[test]
692 fn test_builder_from_connection_string() {
693 let builder = AzblobBuilder::from_connection_string(
694 r#"
695DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
696AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;
697BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
698QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
699TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
700 "#,
701 )
702 .expect("from connection string must succeed");
703
704 assert_eq!(
705 builder.config.endpoint.unwrap(),
706 "http://127.0.0.1:10000/devstoreaccount1"
707 );
708 assert_eq!(builder.config.account_name.unwrap(), "devstoreaccount1");
709 assert_eq!(builder.config.account_key.unwrap(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==");
710
711 let builder = AzblobBuilder::from_connection_string(
712 r#"
713DefaultEndpointsProtocol=https;
714AccountName=storagesample;
715AccountKey=account-key;
716EndpointSuffix=core.chinacloudapi.cn;
717 "#,
718 )
719 .expect("from connection string must succeed");
720
721 assert_eq!(
722 builder.config.endpoint.unwrap(),
723 "https://storagesample.blob.core.chinacloudapi.cn"
724 );
725 assert_eq!(builder.config.account_name.unwrap(), "storagesample");
726 assert_eq!(builder.config.account_key.unwrap(), "account-key")
727 }
728
729 #[test]
730 fn test_sas_from_connection_string() {
731 let builder = AzblobBuilder::from_connection_string(
733 r#"
734BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
735QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
736TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
737SharedAccessSignature=sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D
738 "#,
739 )
740 .expect("from connection string must succeed");
741
742 assert_eq!(
743 builder.config.endpoint.unwrap(),
744 "http://127.0.0.1:10000/devstoreaccount1"
745 );
746 assert_eq!(builder.config.sas_token.unwrap(), "sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D");
747 assert_eq!(builder.config.account_name, None);
748 assert_eq!(builder.config.account_key, None);
749 }
750
751 #[test]
752 pub fn test_sas_preferred() {
753 let builder = AzblobBuilder::from_connection_string(
754 r#"
755BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
756AccountName=storagesample;
757AccountKey=account-key;
758SharedAccessSignature=sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D
759 "#,
760 )
761 .expect("from connection string must succeed");
762
763 assert_eq!(builder.config.sas_token.unwrap(), "sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D");
765 assert_eq!(builder.config.account_name, None);
766 assert_eq!(builder.config.account_key, None);
767 }
768}