1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use base64::prelude::BASE64_STANDARD;
23use base64::Engine;
24use http::Response;
25use http::StatusCode;
26use log::debug;
27use reqsign::AzureStorageConfig;
28use reqsign::AzureStorageLoader;
29use reqsign::AzureStorageSigner;
30use sha2::Digest;
31use sha2::Sha256;
32
33use super::core::constants::X_MS_META_PREFIX;
34use super::core::constants::X_MS_VERSION_ID;
35use super::core::AzblobCore;
36use super::delete::AzblobDeleter;
37use super::error::parse_error;
38use super::lister::AzblobLister;
39use super::writer::AzblobWriter;
40use super::writer::AzblobWriters;
41use crate::raw::*;
42use crate::services::AzblobConfig;
43use crate::*;
44
45const AZBLOB_BATCH_LIMIT: usize = 256;
46
47impl From<AzureStorageConfig> for AzblobConfig {
48 fn from(value: AzureStorageConfig) -> Self {
49 Self {
50 endpoint: value.endpoint,
51 account_name: value.account_name,
52 account_key: value.account_key,
53 sas_token: value.sas_token,
54 ..Default::default()
55 }
56 }
57}
58
59impl Configurator for AzblobConfig {
60 type Builder = AzblobBuilder;
61
62 #[allow(deprecated)]
63 fn into_builder(self) -> Self::Builder {
64 AzblobBuilder {
65 config: self,
66
67 http_client: None,
68 }
69 }
70}
71
72#[doc = include_str!("docs.md")]
73#[derive(Default, Clone)]
74pub struct AzblobBuilder {
75 config: AzblobConfig,
76
77 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
78 http_client: Option<HttpClient>,
79}
80
81impl Debug for AzblobBuilder {
82 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
83 let mut ds = f.debug_struct("AzblobBuilder");
84
85 ds.field("config", &self.config);
86
87 ds.finish()
88 }
89}
90
91impl AzblobBuilder {
92 pub fn root(mut self, root: &str) -> Self {
96 self.config.root = if root.is_empty() {
97 None
98 } else {
99 Some(root.to_string())
100 };
101
102 self
103 }
104
105 pub fn container(mut self, container: &str) -> Self {
107 self.config.container = container.to_string();
108
109 self
110 }
111
112 pub fn endpoint(mut self, endpoint: &str) -> Self {
119 if !endpoint.is_empty() {
120 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
122 }
123
124 self
125 }
126
127 pub fn account_name(mut self, account_name: &str) -> Self {
132 if !account_name.is_empty() {
133 self.config.account_name = Some(account_name.to_string());
134 }
135
136 self
137 }
138
139 pub fn account_key(mut self, account_key: &str) -> Self {
144 if !account_key.is_empty() {
145 self.config.account_key = Some(account_key.to_string());
146 }
147
148 self
149 }
150
151 pub fn encryption_key(mut self, v: &str) -> Self {
164 if !v.is_empty() {
165 self.config.encryption_key = Some(v.to_string());
166 }
167
168 self
169 }
170
171 pub fn encryption_key_sha256(mut self, v: &str) -> Self {
184 if !v.is_empty() {
185 self.config.encryption_key_sha256 = Some(v.to_string());
186 }
187
188 self
189 }
190
191 pub fn encryption_algorithm(mut self, v: &str) -> Self {
204 if !v.is_empty() {
205 self.config.encryption_algorithm = Some(v.to_string());
206 }
207
208 self
209 }
210
211 pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
225 self.config.encryption_algorithm = Some("AES256".to_string());
227 self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
228 self.config.encryption_key_sha256 =
229 Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
230 self
231 }
232
233 pub fn sas_token(mut self, sas_token: &str) -> Self {
241 if !sas_token.is_empty() {
242 self.config.sas_token = Some(sas_token.to_string());
243 }
244
245 self
246 }
247
248 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
255 #[allow(deprecated)]
256 pub fn http_client(mut self, client: HttpClient) -> Self {
257 self.http_client = Some(client);
258 self
259 }
260
261 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
263 self.config.batch_max_operations = Some(batch_max_operations);
264
265 self
266 }
267
268 pub fn from_connection_string(conn: &str) -> Result<Self> {
296 let config =
297 raw::azure_config_from_connection_string(conn, raw::AzureStorageService::Blob)?;
298
299 Ok(AzblobConfig::from(config).into_builder())
300 }
301}
302
303impl Builder for AzblobBuilder {
304 const SCHEME: Scheme = Scheme::Azblob;
305 type Config = AzblobConfig;
306
307 fn build(self) -> Result<impl Access> {
308 debug!("backend build started: {:?}", &self);
309
310 let root = normalize_root(&self.config.root.unwrap_or_default());
311 debug!("backend use root {}", root);
312
313 let container = match self.config.container.is_empty() {
315 false => Ok(&self.config.container),
316 true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
317 .with_operation("Builder::build")
318 .with_context("service", Scheme::Azblob)),
319 }?;
320 debug!("backend use container {}", &container);
321
322 let endpoint = match &self.config.endpoint {
323 Some(endpoint) => Ok(endpoint.clone()),
324 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
325 .with_operation("Builder::build")
326 .with_context("service", Scheme::Azblob)),
327 }?;
328 debug!("backend use endpoint {}", &container);
329
330 let mut config_loader = AzureStorageConfig::default().from_env();
331
332 if let Some(v) = self
333 .config
334 .account_name
335 .clone()
336 .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()))
337 {
338 config_loader.account_name = Some(v);
339 }
340
341 if let Some(v) = self.config.account_key.clone() {
342 config_loader.account_key = Some(v);
343 }
344
345 if let Some(v) = self.config.sas_token.clone() {
346 config_loader.sas_token = Some(v);
347 }
348
349 let encryption_key =
350 match &self.config.encryption_key {
351 None => None,
352 Some(v) => Some(build_header_value(v).map_err(|err| {
353 err.with_context("key", "server_side_encryption_customer_key")
354 })?),
355 };
356
357 let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
358 None => None,
359 Some(v) => Some(build_header_value(v).map_err(|err| {
360 err.with_context("key", "server_side_encryption_customer_key_sha256")
361 })?),
362 };
363
364 let encryption_algorithm = match &self.config.encryption_algorithm {
365 None => None,
366 Some(v) => {
367 if v == "AES256" {
368 Some(build_header_value(v).map_err(|err| {
369 err.with_context("key", "server_side_encryption_customer_algorithm")
370 })?)
371 } else {
372 return Err(Error::new(
373 ErrorKind::ConfigInvalid,
374 "encryption_algorithm value must be AES256",
375 ));
376 }
377 }
378 };
379
380 let cred_loader = AzureStorageLoader::new(config_loader);
381
382 let signer = AzureStorageSigner::new();
383
384 Ok(AzblobBackend {
385 core: Arc::new(AzblobCore {
386 info: {
387 let am = AccessorInfo::default();
388 am.set_scheme(Scheme::Azblob)
389 .set_root(&root)
390 .set_name(container)
391 .set_native_capability(Capability {
392 stat: true,
393 stat_with_if_match: true,
394 stat_with_if_none_match: true,
395 stat_has_cache_control: true,
396 stat_has_content_length: true,
397 stat_has_content_type: true,
398 stat_has_content_encoding: true,
399 stat_has_content_range: true,
400 stat_has_etag: true,
401 stat_has_content_md5: true,
402 stat_has_last_modified: true,
403 stat_has_content_disposition: true,
404
405 read: true,
406
407 read_with_if_match: true,
408 read_with_if_none_match: true,
409 read_with_override_content_disposition: true,
410 read_with_if_modified_since: true,
411 read_with_if_unmodified_since: true,
412
413 write: true,
414 write_can_append: true,
415 write_can_empty: true,
416 write_can_multi: true,
417 write_with_cache_control: true,
418 write_with_content_type: true,
419 write_with_if_not_exists: true,
420 write_with_if_none_match: true,
421 write_with_user_metadata: true,
422
423 delete: true,
424 delete_max_size: Some(AZBLOB_BATCH_LIMIT),
425
426 copy: true,
427
428 list: true,
429 list_with_recursive: true,
430 list_has_etag: true,
431 list_has_content_length: true,
432 list_has_content_md5: true,
433 list_has_content_type: true,
434 list_has_last_modified: true,
435
436 presign: self.config.sas_token.is_some(),
437 presign_stat: self.config.sas_token.is_some(),
438 presign_read: self.config.sas_token.is_some(),
439 presign_write: self.config.sas_token.is_some(),
440
441 shared: true,
442
443 ..Default::default()
444 });
445
446 #[allow(deprecated)]
448 if let Some(client) = self.http_client {
449 am.update_http_client(|_| client);
450 }
451
452 am.into()
453 },
454 root,
455 endpoint,
456 encryption_key,
457 encryption_key_sha256,
458 encryption_algorithm,
459 container: self.config.container.clone(),
460
461 loader: cred_loader,
462 signer,
463 }),
464 })
465 }
466}
467
468#[derive(Debug, Clone)]
470pub struct AzblobBackend {
471 core: Arc<AzblobCore>,
472}
473
474impl Access for AzblobBackend {
475 type Reader = HttpBody;
476 type Writer = AzblobWriters;
477 type Lister = oio::PageLister<AzblobLister>;
478 type Deleter = oio::BatchDeleter<AzblobDeleter>;
479
480 fn info(&self) -> Arc<AccessorInfo> {
481 self.core.info.clone()
482 }
483
484 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
485 let resp = self.core.azblob_get_blob_properties(path, &args).await?;
486
487 let status = resp.status();
488
489 match status {
490 StatusCode::OK => {
491 let headers = resp.headers();
492 let mut meta = parse_into_metadata(path, headers)?;
493 if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
494 meta.set_version(version_id);
495 }
496
497 let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
498 if !user_meta.is_empty() {
499 meta = meta.with_user_metadata(user_meta);
500 }
501
502 Ok(RpStat::new(meta))
503 }
504 _ => Err(parse_error(resp)),
505 }
506 }
507
508 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
509 let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
510
511 let status = resp.status();
512 match status {
513 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
514 _ => {
515 let (part, mut body) = resp.into_parts();
516 let buf = body.to_buffer().await?;
517 Err(parse_error(Response::from_parts(part, buf)))
518 }
519 }
520 }
521
522 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
523 let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
524 let w = if args.append() {
525 AzblobWriters::Two(oio::AppendWriter::new(w))
526 } else {
527 AzblobWriters::One(oio::BlockWriter::new(
528 self.core.info.clone(),
529 w,
530 args.concurrent(),
531 ))
532 };
533
534 Ok((RpWrite::default(), w))
535 }
536
537 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
538 Ok((
539 RpDelete::default(),
540 oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
541 ))
542 }
543
544 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
545 let l = AzblobLister::new(
546 self.core.clone(),
547 path.to_string(),
548 args.recursive(),
549 args.limit(),
550 );
551
552 Ok((RpList::default(), oio::PageLister::new(l)))
553 }
554
555 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
556 let resp = self.core.azblob_copy_blob(from, to).await?;
557
558 let status = resp.status();
559
560 match status {
561 StatusCode::ACCEPTED => Ok(RpCopy::default()),
562 _ => Err(parse_error(resp)),
563 }
564 }
565
566 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
567 let req = match args.operation() {
568 PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
569 PresignOperation::Read(v) => {
570 self.core
571 .azblob_get_blob_request(path, BytesRange::default(), v)
572 }
573 PresignOperation::Write(_) => {
574 self.core
575 .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
576 }
577 PresignOperation::Delete(_) => Err(Error::new(
578 ErrorKind::Unsupported,
579 "operation is not supported",
580 )),
581 };
582
583 let mut req = req?;
584
585 self.core.sign_query(&mut req).await?;
586
587 let (parts, _) = req.into_parts();
588
589 Ok(RpPresign::new(PresignedRequest::new(
590 parts.method,
591 parts.uri,
592 parts.headers,
593 )))
594 }
595}