1use std::fmt::Debug;
19use std::sync::Arc;
20
21use base64::Engine;
22use base64::prelude::BASE64_STANDARD;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use reqsign::AzureStorageConfig;
27use reqsign::AzureStorageLoader;
28use reqsign::AzureStorageSigner;
29use sha2::Digest;
30use sha2::Sha256;
31
32use super::AZBLOB_SCHEME;
33use super::config::AzblobConfig;
34use super::core::AzblobCore;
35use super::core::constants::X_MS_META_PREFIX;
36use super::core::constants::X_MS_VERSION_ID;
37use super::delete::AzblobDeleter;
38use super::error::parse_error;
39use super::lister::AzblobLister;
40use super::writer::AzblobWriter;
41use super::writer::AzblobWriters;
42use crate::raw::*;
43use crate::*;
44
45const AZBLOB_BATCH_LIMIT: usize = 256;
46
47impl From<AzureStorageConfig> for AzblobConfig {
48 fn from(value: AzureStorageConfig) -> Self {
49 Self {
50 endpoint: value.endpoint,
51 account_name: value.account_name,
52 account_key: value.account_key,
53 sas_token: value.sas_token,
54 ..Default::default()
55 }
56 }
57}
58
59#[doc = include_str!("docs.md")]
60#[derive(Default)]
61pub struct AzblobBuilder {
62 pub(super) config: AzblobConfig,
63
64 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
65 pub(super) http_client: Option<HttpClient>,
66}
67
68impl Debug for AzblobBuilder {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("AzblobBuilder")
71 .field("config", &self.config)
72 .finish_non_exhaustive()
73 }
74}
75
76impl AzblobBuilder {
77 pub fn root(mut self, root: &str) -> Self {
81 self.config.root = if root.is_empty() {
82 None
83 } else {
84 Some(root.to_string())
85 };
86
87 self
88 }
89
90 pub fn container(mut self, container: &str) -> Self {
92 self.config.container = container.to_string();
93
94 self
95 }
96
97 pub fn endpoint(mut self, endpoint: &str) -> Self {
104 if !endpoint.is_empty() {
105 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
107 }
108
109 self
110 }
111
112 pub fn account_name(mut self, account_name: &str) -> Self {
117 if !account_name.is_empty() {
118 self.config.account_name = Some(account_name.to_string());
119 }
120
121 self
122 }
123
124 pub fn account_key(mut self, account_key: &str) -> Self {
129 if !account_key.is_empty() {
130 self.config.account_key = Some(account_key.to_string());
131 }
132
133 self
134 }
135
136 pub fn encryption_key(mut self, v: &str) -> Self {
149 if !v.is_empty() {
150 self.config.encryption_key = Some(v.to_string());
151 }
152
153 self
154 }
155
156 pub fn encryption_key_sha256(mut self, v: &str) -> Self {
169 if !v.is_empty() {
170 self.config.encryption_key_sha256 = Some(v.to_string());
171 }
172
173 self
174 }
175
176 pub fn encryption_algorithm(mut self, v: &str) -> Self {
189 if !v.is_empty() {
190 self.config.encryption_algorithm = Some(v.to_string());
191 }
192
193 self
194 }
195
196 pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
210 self.config.encryption_algorithm = Some("AES256".to_string());
212 self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
213 self.config.encryption_key_sha256 =
214 Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
215 self
216 }
217
218 pub fn sas_token(mut self, sas_token: &str) -> Self {
226 if !sas_token.is_empty() {
227 self.config.sas_token = Some(sas_token.to_string());
228 }
229
230 self
231 }
232
233 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
240 #[allow(deprecated)]
241 pub fn http_client(mut self, client: HttpClient) -> Self {
242 self.http_client = Some(client);
243 self
244 }
245
246 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
248 self.config.batch_max_operations = Some(batch_max_operations);
249
250 self
251 }
252
253 pub fn from_connection_string(conn: &str) -> Result<Self> {
281 let config =
282 raw::azure_config_from_connection_string(conn, raw::AzureStorageService::Blob)?;
283
284 Ok(AzblobConfig::from(config).into_builder())
285 }
286}
287
288impl Builder for AzblobBuilder {
289 type Config = AzblobConfig;
290
291 fn build(self) -> Result<impl Access> {
292 debug!("backend build started: {:?}", &self);
293
294 let root = normalize_root(&self.config.root.unwrap_or_default());
295 debug!("backend use root {root}");
296
297 let container = match self.config.container.is_empty() {
299 false => Ok(&self.config.container),
300 true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
301 .with_operation("Builder::build")
302 .with_context("service", Scheme::Azblob)),
303 }?;
304 debug!("backend use container {}", &container);
305
306 let endpoint = match &self.config.endpoint {
307 Some(endpoint) => Ok(endpoint.clone()),
308 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
309 .with_operation("Builder::build")
310 .with_context("service", Scheme::Azblob)),
311 }?;
312 debug!("backend use endpoint {}", &container);
313
314 #[cfg(target_arch = "wasm32")]
315 let mut config_loader = AzureStorageConfig::default();
316 #[cfg(not(target_arch = "wasm32"))]
317 let mut config_loader = AzureStorageConfig::default().from_env();
318
319 if let Some(v) = self
320 .config
321 .account_name
322 .clone()
323 .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()))
324 {
325 config_loader.account_name = Some(v);
326 }
327
328 if let Some(v) = self.config.account_key.clone() {
329 if let Err(e) = BASE64_STANDARD.decode(&v) {
331 return Err(Error::new(
332 ErrorKind::ConfigInvalid,
333 format!("invalid account_key: cannot decode as base64: {e}"),
334 )
335 .with_operation("Builder::build")
336 .with_context("service", Scheme::Azblob)
337 .with_context("key", "account_key"));
338 }
339 config_loader.account_key = Some(v);
340 }
341
342 if let Some(v) = self.config.sas_token.clone() {
343 config_loader.sas_token = Some(v);
344 }
345
346 let encryption_key =
347 match &self.config.encryption_key {
348 None => None,
349 Some(v) => Some(build_header_value(v).map_err(|err| {
350 err.with_context("key", "server_side_encryption_customer_key")
351 })?),
352 };
353
354 let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
355 None => None,
356 Some(v) => Some(build_header_value(v).map_err(|err| {
357 err.with_context("key", "server_side_encryption_customer_key_sha256")
358 })?),
359 };
360
361 let encryption_algorithm = match &self.config.encryption_algorithm {
362 None => None,
363 Some(v) => {
364 if v == "AES256" {
365 Some(build_header_value(v).map_err(|err| {
366 err.with_context("key", "server_side_encryption_customer_algorithm")
367 })?)
368 } else {
369 return Err(Error::new(
370 ErrorKind::ConfigInvalid,
371 "encryption_algorithm value must be AES256",
372 ));
373 }
374 }
375 };
376
377 let cred_loader = AzureStorageLoader::new(config_loader);
378
379 let signer = AzureStorageSigner::new();
380
381 Ok(AzblobBackend {
382 core: Arc::new(AzblobCore {
383 info: {
384 let am = AccessorInfo::default();
385 am.set_scheme(AZBLOB_SCHEME)
386 .set_root(&root)
387 .set_name(container)
388 .set_native_capability(Capability {
389 stat: true,
390 stat_with_if_match: true,
391 stat_with_if_none_match: true,
392
393 read: true,
394
395 read_with_if_match: true,
396 read_with_if_none_match: true,
397 read_with_override_content_disposition: true,
398 read_with_if_modified_since: true,
399 read_with_if_unmodified_since: true,
400
401 write: true,
402 write_can_append: true,
403 write_can_empty: true,
404 write_can_multi: true,
405 write_with_cache_control: true,
406 write_with_content_type: true,
407 write_with_if_not_exists: true,
408 write_with_if_none_match: true,
409 write_with_user_metadata: true,
410
411 delete: true,
412 delete_max_size: Some(AZBLOB_BATCH_LIMIT),
413
414 copy: true,
415 copy_with_if_not_exists: true,
416
417 list: true,
418 list_with_recursive: true,
419
420 presign: self.config.sas_token.is_some(),
421 presign_stat: self.config.sas_token.is_some(),
422 presign_read: self.config.sas_token.is_some(),
423 presign_write: self.config.sas_token.is_some(),
424
425 shared: true,
426
427 ..Default::default()
428 });
429
430 #[allow(deprecated)]
432 if let Some(client) = self.http_client {
433 am.update_http_client(|_| client);
434 }
435
436 am.into()
437 },
438 root,
439 endpoint,
440 encryption_key,
441 encryption_key_sha256,
442 encryption_algorithm,
443 container: self.config.container.clone(),
444
445 loader: cred_loader,
446 signer,
447 }),
448 })
449 }
450}
451
452#[derive(Debug, Clone)]
454pub struct AzblobBackend {
455 core: Arc<AzblobCore>,
456}
457
458impl Access for AzblobBackend {
459 type Reader = HttpBody;
460 type Writer = AzblobWriters;
461 type Lister = oio::PageLister<AzblobLister>;
462 type Deleter = oio::BatchDeleter<AzblobDeleter>;
463
464 fn info(&self) -> Arc<AccessorInfo> {
465 self.core.info.clone()
466 }
467
468 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
469 let resp = self.core.azblob_get_blob_properties(path, &args).await?;
470
471 let status = resp.status();
472
473 match status {
474 StatusCode::OK => {
475 let headers = resp.headers();
476 let mut meta = parse_into_metadata(path, headers)?;
477 if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
478 meta.set_version(version_id);
479 }
480
481 let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
482 if !user_meta.is_empty() {
483 meta = meta.with_user_metadata(user_meta);
484 }
485
486 Ok(RpStat::new(meta))
487 }
488 _ => Err(parse_error(resp)),
489 }
490 }
491
492 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
493 let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
494
495 let status = resp.status();
496 match status {
497 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
498 _ => {
499 let (part, mut body) = resp.into_parts();
500 let buf = body.to_buffer().await?;
501 Err(parse_error(Response::from_parts(part, buf)))
502 }
503 }
504 }
505
506 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
507 let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
508 let w = if args.append() {
509 AzblobWriters::Two(oio::AppendWriter::new(w))
510 } else {
511 AzblobWriters::One(oio::BlockWriter::new(
512 self.core.info.clone(),
513 w,
514 args.concurrent(),
515 ))
516 };
517
518 Ok((RpWrite::default(), w))
519 }
520
521 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
522 Ok((
523 RpDelete::default(),
524 oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
525 ))
526 }
527
528 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
529 let l = AzblobLister::new(
530 self.core.clone(),
531 path.to_string(),
532 args.recursive(),
533 args.limit(),
534 );
535
536 Ok((RpList::default(), oio::PageLister::new(l)))
537 }
538
539 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
540 let resp = self.core.azblob_copy_blob(from, to, args).await?;
541
542 let status = resp.status();
543
544 match status {
545 StatusCode::ACCEPTED => Ok(RpCopy::default()),
546 _ => Err(parse_error(resp)),
547 }
548 }
549
550 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
551 let req = match args.operation() {
552 PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
553 PresignOperation::Read(v) => {
554 self.core
555 .azblob_get_blob_request(path, BytesRange::default(), v)
556 }
557 PresignOperation::Write(_) => {
558 self.core
559 .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
560 }
561 PresignOperation::Delete(_) => Err(Error::new(
562 ErrorKind::Unsupported,
563 "operation is not supported",
564 )),
565 };
566
567 let mut req = req?;
568
569 self.core.sign_query(&mut req).await?;
570
571 let (parts, _) = req.into_parts();
572
573 Ok(RpPresign::new(PresignedRequest::new(
574 parts.method,
575 parts.uri,
576 parts.headers,
577 )))
578 }
579}