1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use base64::prelude::BASE64_STANDARD;
24use base64::Engine;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28use reqsign::AzureStorageConfig;
29use reqsign::AzureStorageLoader;
30use reqsign::AzureStorageSigner;
31use sha2::Digest;
32use sha2::Sha256;
33
34use super::core::constants::X_MS_META_PREFIX;
35use super::core::AzblobCore;
36use super::delete::AzblobDeleter;
37use super::error::parse_error;
38use super::lister::AzblobLister;
39use super::writer::AzblobWriter;
40use super::writer::AzblobWriters;
41use crate::raw::*;
42use crate::services::AzblobConfig;
43use crate::*;
44
45const KNOWN_AZBLOB_ENDPOINT_SUFFIX: &[&str] = &[
50 "blob.core.windows.net",
51 "blob.core.usgovcloudapi.net",
52 "blob.core.chinacloudapi.cn",
53];
54
55const AZBLOB_BATCH_LIMIT: usize = 256;
56
57impl Configurator for AzblobConfig {
58 type Builder = AzblobBuilder;
59
60 #[allow(deprecated)]
61 fn into_builder(self) -> Self::Builder {
62 AzblobBuilder {
63 config: self,
64
65 http_client: None,
66 }
67 }
68}
69
70#[doc = include_str!("docs.md")]
71#[derive(Default, Clone)]
72pub struct AzblobBuilder {
73 config: AzblobConfig,
74
75 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
76 http_client: Option<HttpClient>,
77}
78
79impl Debug for AzblobBuilder {
80 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
81 let mut ds = f.debug_struct("AzblobBuilder");
82
83 ds.field("config", &self.config);
84
85 ds.finish()
86 }
87}
88
89impl AzblobBuilder {
90 pub fn root(mut self, root: &str) -> Self {
94 self.config.root = if root.is_empty() {
95 None
96 } else {
97 Some(root.to_string())
98 };
99
100 self
101 }
102
103 pub fn container(mut self, container: &str) -> Self {
105 self.config.container = container.to_string();
106
107 self
108 }
109
110 pub fn endpoint(mut self, endpoint: &str) -> Self {
117 if !endpoint.is_empty() {
118 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
120 }
121
122 self
123 }
124
125 pub fn account_name(mut self, account_name: &str) -> Self {
130 if !account_name.is_empty() {
131 self.config.account_name = Some(account_name.to_string());
132 }
133
134 self
135 }
136
137 pub fn account_key(mut self, account_key: &str) -> Self {
142 if !account_key.is_empty() {
143 self.config.account_key = Some(account_key.to_string());
144 }
145
146 self
147 }
148
149 pub fn encryption_key(mut self, v: &str) -> Self {
162 if !v.is_empty() {
163 self.config.encryption_key = Some(v.to_string());
164 }
165
166 self
167 }
168
169 pub fn encryption_key_sha256(mut self, v: &str) -> Self {
182 if !v.is_empty() {
183 self.config.encryption_key_sha256 = Some(v.to_string());
184 }
185
186 self
187 }
188
189 pub fn encryption_algorithm(mut self, v: &str) -> Self {
202 if !v.is_empty() {
203 self.config.encryption_algorithm = Some(v.to_string());
204 }
205
206 self
207 }
208
209 pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
223 self.config.encryption_algorithm = Some("AES256".to_string());
225 self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
226 self.config.encryption_key_sha256 =
227 Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
228 self
229 }
230
231 pub fn sas_token(mut self, sas_token: &str) -> Self {
239 if !sas_token.is_empty() {
240 self.config.sas_token = Some(sas_token.to_string());
241 }
242
243 self
244 }
245
246 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
253 #[allow(deprecated)]
254 pub fn http_client(mut self, client: HttpClient) -> Self {
255 self.http_client = Some(client);
256 self
257 }
258
259 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
261 self.config.batch_max_operations = Some(batch_max_operations);
262
263 self
264 }
265
266 pub fn from_connection_string(conn: &str) -> Result<Self> {
294 let conn = conn.trim().replace('\n', "");
295
296 let mut conn_map: HashMap<_, _> = HashMap::default();
297 for v in conn.split(';') {
298 let entry: Vec<_> = v.splitn(2, '=').collect();
299 if entry.len() != 2 {
300 continue;
302 }
303 conn_map.insert(entry[0], entry[1]);
304 }
305
306 let mut builder = AzblobBuilder::default();
307
308 if let Some(sas_token) = conn_map.get("SharedAccessSignature") {
309 builder = builder.sas_token(sas_token);
310 } else {
311 let account_name = conn_map.get("AccountName").ok_or_else(|| {
312 Error::new(
313 ErrorKind::ConfigInvalid,
314 "connection string must have AccountName",
315 )
316 .with_operation("Builder::from_connection_string")
317 })?;
318 builder = builder.account_name(account_name);
319 let account_key = conn_map.get("AccountKey").ok_or_else(|| {
320 Error::new(
321 ErrorKind::ConfigInvalid,
322 "connection string must have AccountKey",
323 )
324 .with_operation("Builder::from_connection_string")
325 })?;
326 builder = builder.account_key(account_key);
327 }
328
329 if let Some(v) = conn_map.get("BlobEndpoint") {
330 builder = builder.endpoint(v);
331 } else if let Some(v) = conn_map.get("EndpointSuffix") {
332 let protocol = conn_map.get("DefaultEndpointsProtocol").unwrap_or(&"https");
333 let account_name = builder
334 .config
335 .account_name
336 .as_ref()
337 .ok_or_else(|| {
338 Error::new(
339 ErrorKind::ConfigInvalid,
340 "connection string must have AccountName",
341 )
342 .with_operation("Builder::from_connection_string")
343 })?
344 .clone();
345 builder = builder.endpoint(&format!("{protocol}://{account_name}.blob.{v}"));
346 }
347
348 Ok(builder)
349 }
350}
351
352impl Builder for AzblobBuilder {
353 const SCHEME: Scheme = Scheme::Azblob;
354 type Config = AzblobConfig;
355
356 fn build(self) -> Result<impl Access> {
357 debug!("backend build started: {:?}", &self);
358
359 let root = normalize_root(&self.config.root.unwrap_or_default());
360 debug!("backend use root {}", root);
361
362 let container = match self.config.container.is_empty() {
364 false => Ok(&self.config.container),
365 true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
366 .with_operation("Builder::build")
367 .with_context("service", Scheme::Azblob)),
368 }?;
369 debug!("backend use container {}", &container);
370
371 let endpoint = match &self.config.endpoint {
372 Some(endpoint) => Ok(endpoint.clone()),
373 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
374 .with_operation("Builder::build")
375 .with_context("service", Scheme::Azblob)),
376 }?;
377 debug!("backend use endpoint {}", &container);
378
379 let mut config_loader = AzureStorageConfig::default().from_env();
380
381 if let Some(v) = self
382 .config
383 .account_name
384 .clone()
385 .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str()))
386 {
387 config_loader.account_name = Some(v);
388 }
389
390 if let Some(v) = self.config.account_key.clone() {
391 config_loader.account_key = Some(v);
392 }
393
394 if let Some(v) = self.config.sas_token.clone() {
395 config_loader.sas_token = Some(v);
396 }
397
398 let encryption_key =
399 match &self.config.encryption_key {
400 None => None,
401 Some(v) => Some(build_header_value(v).map_err(|err| {
402 err.with_context("key", "server_side_encryption_customer_key")
403 })?),
404 };
405
406 let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
407 None => None,
408 Some(v) => Some(build_header_value(v).map_err(|err| {
409 err.with_context("key", "server_side_encryption_customer_key_sha256")
410 })?),
411 };
412
413 let encryption_algorithm = match &self.config.encryption_algorithm {
414 None => None,
415 Some(v) => {
416 if v == "AES256" {
417 Some(build_header_value(v).map_err(|err| {
418 err.with_context("key", "server_side_encryption_customer_algorithm")
419 })?)
420 } else {
421 return Err(Error::new(
422 ErrorKind::ConfigInvalid,
423 "encryption_algorithm value must be AES256",
424 ));
425 }
426 }
427 };
428
429 let cred_loader = AzureStorageLoader::new(config_loader);
430
431 let signer = AzureStorageSigner::new();
432
433 Ok(AzblobBackend {
434 core: Arc::new(AzblobCore {
435 info: {
436 let am = AccessorInfo::default();
437 am.set_scheme(Scheme::Azblob)
438 .set_root(&root)
439 .set_name(container)
440 .set_native_capability(Capability {
441 stat: true,
442 stat_with_if_match: true,
443 stat_with_if_none_match: true,
444 stat_has_cache_control: true,
445 stat_has_content_length: true,
446 stat_has_content_type: true,
447 stat_has_content_encoding: true,
448 stat_has_content_range: true,
449 stat_has_etag: true,
450 stat_has_content_md5: true,
451 stat_has_last_modified: true,
452 stat_has_content_disposition: true,
453
454 read: true,
455
456 read_with_if_match: true,
457 read_with_if_none_match: true,
458 read_with_override_content_disposition: true,
459 read_with_if_modified_since: true,
460 read_with_if_unmodified_since: true,
461
462 write: true,
463 write_can_append: true,
464 write_can_empty: true,
465 write_can_multi: true,
466 write_with_cache_control: true,
467 write_with_content_type: true,
468 write_with_if_not_exists: true,
469 write_with_if_none_match: true,
470 write_with_user_metadata: true,
471
472 delete: true,
473 delete_max_size: Some(AZBLOB_BATCH_LIMIT),
474
475 copy: true,
476
477 list: true,
478 list_with_recursive: true,
479 list_has_etag: true,
480 list_has_content_length: true,
481 list_has_content_md5: true,
482 list_has_content_type: true,
483 list_has_last_modified: true,
484
485 presign: self.config.sas_token.is_some(),
486 presign_stat: self.config.sas_token.is_some(),
487 presign_read: self.config.sas_token.is_some(),
488 presign_write: self.config.sas_token.is_some(),
489
490 shared: true,
491
492 ..Default::default()
493 });
494
495 #[allow(deprecated)]
497 if let Some(client) = self.http_client {
498 am.update_http_client(|_| client);
499 }
500
501 am.into()
502 },
503 root,
504 endpoint,
505 encryption_key,
506 encryption_key_sha256,
507 encryption_algorithm,
508 container: self.config.container.clone(),
509
510 loader: cred_loader,
511 signer,
512 }),
513 })
514 }
515}
516
517fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
518 let endpoint: &str = endpoint
519 .strip_prefix("http://")
520 .or_else(|| endpoint.strip_prefix("https://"))
521 .unwrap_or(endpoint);
522
523 let mut parts = endpoint.splitn(2, '.');
524 let storage_name = parts.next();
525 let endpoint_suffix = parts
526 .next()
527 .unwrap_or_default()
528 .trim_end_matches('/')
529 .to_lowercase();
530
531 if KNOWN_AZBLOB_ENDPOINT_SUFFIX
532 .iter()
533 .any(|s| *s == endpoint_suffix.as_str())
534 {
535 storage_name.map(|s| s.to_string())
536 } else {
537 None
538 }
539}
540
541#[derive(Debug, Clone)]
543pub struct AzblobBackend {
544 core: Arc<AzblobCore>,
545}
546
547impl Access for AzblobBackend {
548 type Reader = HttpBody;
549 type Writer = AzblobWriters;
550 type Lister = oio::PageLister<AzblobLister>;
551 type Deleter = oio::BatchDeleter<AzblobDeleter>;
552 type BlockingReader = ();
553 type BlockingWriter = ();
554 type BlockingLister = ();
555 type BlockingDeleter = ();
556
557 fn info(&self) -> Arc<AccessorInfo> {
558 self.core.info.clone()
559 }
560
561 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
562 let resp = self.core.azblob_get_blob_properties(path, &args).await?;
563
564 let status = resp.status();
565
566 match status {
567 StatusCode::OK => {
568 let headers = resp.headers();
569 let mut meta = parse_into_metadata(path, headers)?;
570
571 let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
572 if !user_meta.is_empty() {
573 meta.with_user_metadata(user_meta);
574 }
575
576 Ok(RpStat::new(meta))
577 }
578 _ => Err(parse_error(resp)),
579 }
580 }
581
582 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
583 let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
584
585 let status = resp.status();
586 match status {
587 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
588 _ => {
589 let (part, mut body) = resp.into_parts();
590 let buf = body.to_buffer().await?;
591 Err(parse_error(Response::from_parts(part, buf)))
592 }
593 }
594 }
595
596 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
597 let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
598 let w = if args.append() {
599 AzblobWriters::Two(oio::AppendWriter::new(w))
600 } else {
601 AzblobWriters::One(oio::BlockWriter::new(
602 self.core.info.clone(),
603 w,
604 args.concurrent(),
605 ))
606 };
607
608 Ok((RpWrite::default(), w))
609 }
610
611 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
612 Ok((
613 RpDelete::default(),
614 oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
615 ))
616 }
617
618 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
619 let l = AzblobLister::new(
620 self.core.clone(),
621 path.to_string(),
622 args.recursive(),
623 args.limit(),
624 );
625
626 Ok((RpList::default(), oio::PageLister::new(l)))
627 }
628
629 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
630 let resp = self.core.azblob_copy_blob(from, to).await?;
631
632 let status = resp.status();
633
634 match status {
635 StatusCode::ACCEPTED => Ok(RpCopy::default()),
636 _ => Err(parse_error(resp)),
637 }
638 }
639
640 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
641 let req = match args.operation() {
642 PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
643 PresignOperation::Read(v) => {
644 self.core
645 .azblob_get_blob_request(path, BytesRange::default(), v)
646 }
647 PresignOperation::Write(_) => {
648 self.core
649 .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
650 }
651 PresignOperation::Delete(_) => Err(Error::new(
652 ErrorKind::Unsupported,
653 "operation is not supported",
654 )),
655 };
656
657 let mut req = req?;
658
659 self.core.sign_query(&mut req).await?;
660
661 let (parts, _) = req.into_parts();
662
663 Ok(RpPresign::new(PresignedRequest::new(
664 parts.method,
665 parts.uri,
666 parts.headers,
667 )))
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::infer_storage_name_from_endpoint;
674 use super::AzblobBuilder;
675
676 #[test]
677 fn test_infer_storage_name_from_endpoint() {
678 let endpoint = "https://account.blob.core.windows.net";
679 let storage_name = infer_storage_name_from_endpoint(endpoint);
680 assert_eq!(storage_name, Some("account".to_string()));
681 }
682
683 #[test]
684 fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
685 let endpoint = "https://account.blob.core.windows.net/";
686 let storage_name = infer_storage_name_from_endpoint(endpoint);
687 assert_eq!(storage_name, Some("account".to_string()));
688 }
689
690 #[test]
691 fn test_builder_from_connection_string() {
692 let builder = AzblobBuilder::from_connection_string(
693 r#"
694DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
695AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;
696BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
697QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
698TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
699 "#,
700 )
701 .expect("from connection string must succeed");
702
703 assert_eq!(
704 builder.config.endpoint.unwrap(),
705 "http://127.0.0.1:10000/devstoreaccount1"
706 );
707 assert_eq!(builder.config.account_name.unwrap(), "devstoreaccount1");
708 assert_eq!(builder.config.account_key.unwrap(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==");
709
710 let builder = AzblobBuilder::from_connection_string(
711 r#"
712DefaultEndpointsProtocol=https;
713AccountName=storagesample;
714AccountKey=account-key;
715EndpointSuffix=core.chinacloudapi.cn;
716 "#,
717 )
718 .expect("from connection string must succeed");
719
720 assert_eq!(
721 builder.config.endpoint.unwrap(),
722 "https://storagesample.blob.core.chinacloudapi.cn"
723 );
724 assert_eq!(builder.config.account_name.unwrap(), "storagesample");
725 assert_eq!(builder.config.account_key.unwrap(), "account-key")
726 }
727
728 #[test]
729 fn test_sas_from_connection_string() {
730 let builder = AzblobBuilder::from_connection_string(
732 r#"
733BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
734QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
735TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
736SharedAccessSignature=sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D
737 "#,
738 )
739 .expect("from connection string must succeed");
740
741 assert_eq!(
742 builder.config.endpoint.unwrap(),
743 "http://127.0.0.1:10000/devstoreaccount1"
744 );
745 assert_eq!(builder.config.sas_token.unwrap(), "sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D");
746 assert_eq!(builder.config.account_name, None);
747 assert_eq!(builder.config.account_key, None);
748 }
749
750 #[test]
751 pub fn test_sas_preferred() {
752 let builder = AzblobBuilder::from_connection_string(
753 r#"
754BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
755AccountName=storagesample;
756AccountKey=account-key;
757SharedAccessSignature=sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D
758 "#,
759 )
760 .expect("from connection string must succeed");
761
762 assert_eq!(builder.config.sas_token.unwrap(), "sv=2021-01-01&ss=b&srt=c&sp=rwdlaciytfx&se=2022-01-01T11:00:14Z&st=2022-01-02T03:00:14Z&spr=https&sig=KEllk4N8f7rJfLjQCmikL2fRVt%2B%2Bl73UBkbgH%2FK3VGE%3D");
764 assert_eq!(builder.config.account_name, None);
765 assert_eq!(builder.config.account_key, None);
766 }
767}