opendal/services/azblob/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use base64::Engine;
22use base64::prelude::BASE64_STANDARD;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use reqsign::AzureStorageConfig;
27use reqsign::AzureStorageLoader;
28use reqsign::AzureStorageSigner;
29use sha2::Digest;
30use sha2::Sha256;
31
32use super::AZBLOB_SCHEME;
33use super::config::AzblobConfig;
34use super::core::AzblobCore;
35use super::core::constants::X_MS_META_PREFIX;
36use super::core::constants::X_MS_VERSION_ID;
37use super::delete::AzblobDeleter;
38use super::error::parse_error;
39use super::lister::AzblobLister;
40use super::writer::AzblobWriter;
41use super::writer::AzblobWriters;
42use crate::raw::*;
43use crate::*;
44
45const AZBLOB_BATCH_LIMIT: usize = 256;
46
47impl From<AzureStorageConfig> for AzblobConfig {
48    fn from(value: AzureStorageConfig) -> Self {
49        Self {
50            endpoint: value.endpoint,
51            account_name: value.account_name,
52            account_key: value.account_key,
53            sas_token: value.sas_token,
54            ..Default::default()
55        }
56    }
57}
58
59#[doc = include_str!("docs.md")]
60#[derive(Default)]
61pub struct AzblobBuilder {
62    pub(super) config: AzblobConfig,
63
64    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
65    pub(super) http_client: Option<HttpClient>,
66}
67
68impl Debug for AzblobBuilder {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("AzblobBuilder")
71            .field("config", &self.config)
72            .finish_non_exhaustive()
73    }
74}
75
76impl AzblobBuilder {
77    /// Set root of this backend.
78    ///
79    /// All operations will happen under this root.
80    pub fn root(mut self, root: &str) -> Self {
81        self.config.root = if root.is_empty() {
82            None
83        } else {
84            Some(root.to_string())
85        };
86
87        self
88    }
89
90    /// Set container name of this backend.
91    pub fn container(mut self, container: &str) -> Self {
92        self.config.container = container.to_string();
93
94        self
95    }
96
97    /// Set endpoint of this backend
98    ///
99    /// Endpoint must be full uri, e.g.
100    ///
101    /// - Azblob: `https://accountname.blob.core.windows.net`
102    /// - Azurite: `http://127.0.0.1:10000/devstoreaccount1`
103    pub fn endpoint(mut self, endpoint: &str) -> Self {
104        if !endpoint.is_empty() {
105            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
106            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
107        }
108
109        self
110    }
111
112    /// Set account_name of this backend.
113    ///
114    /// - If account_name is set, we will take user's input first.
115    /// - If not, we will try to load it from environment.
116    pub fn account_name(mut self, account_name: &str) -> Self {
117        if !account_name.is_empty() {
118            self.config.account_name = Some(account_name.to_string());
119        }
120
121        self
122    }
123
124    /// Set account_key of this backend.
125    ///
126    /// - If account_key is set, we will take user's input first.
127    /// - If not, we will try to load it from environment.
128    pub fn account_key(mut self, account_key: &str) -> Self {
129        if !account_key.is_empty() {
130            self.config.account_key = Some(account_key.to_string());
131        }
132
133        self
134    }
135
136    /// Set encryption_key of this backend.
137    ///
138    /// # Args
139    ///
140    /// `v`: Base64-encoded key that matches algorithm specified in `encryption_algorithm`.
141    ///
142    /// # Note
143    ///
144    /// This function is the low-level setting for SSE related features.
145    ///
146    /// SSE related options should be set carefully to make them works.
147    /// Please use `server_side_encryption_with_*` helpers if even possible.
148    pub fn encryption_key(mut self, v: &str) -> Self {
149        if !v.is_empty() {
150            self.config.encryption_key = Some(v.to_string());
151        }
152
153        self
154    }
155
156    /// Set encryption_key_sha256 of this backend.
157    ///
158    /// # Args
159    ///
160    /// `v`: Base64-encoded SHA256 digest of the key specified in encryption_key.
161    ///
162    /// # Note
163    ///
164    /// This function is the low-level setting for SSE related features.
165    ///
166    /// SSE related options should be set carefully to make them works.
167    /// Please use `server_side_encryption_with_*` helpers if even possible.
168    pub fn encryption_key_sha256(mut self, v: &str) -> Self {
169        if !v.is_empty() {
170            self.config.encryption_key_sha256 = Some(v.to_string());
171        }
172
173        self
174    }
175
176    /// Set encryption_algorithm of this backend.
177    ///
178    /// # Args
179    ///
180    /// `v`: server-side encryption algorithm. (Available values: `AES256`)
181    ///
182    /// # Note
183    ///
184    /// This function is the low-level setting for SSE related features.
185    ///
186    /// SSE related options should be set carefully to make them works.
187    /// Please use `server_side_encryption_with_*` helpers if even possible.
188    pub fn encryption_algorithm(mut self, v: &str) -> Self {
189        if !v.is_empty() {
190            self.config.encryption_algorithm = Some(v.to_string());
191        }
192
193        self
194    }
195
196    /// Enable server side encryption with customer key.
197    ///
198    /// As known as: CPK
199    ///
200    /// # Args
201    ///
202    /// `key`: Base64-encoded SHA256 digest of the key specified in encryption_key.
203    ///
204    /// # Note
205    ///
206    /// Function that helps the user to set the server-side customer-provided encryption key, the key's SHA256, and the algorithm.
207    /// See [Server-side encryption with customer-provided keys (CPK)](https://learn.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys)
208    /// for more info.
209    pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
210        // Only AES256 is supported for now
211        self.config.encryption_algorithm = Some("AES256".to_string());
212        self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
213        self.config.encryption_key_sha256 =
214            Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
215        self
216    }
217
218    /// Set sas_token of this backend.
219    ///
220    /// - If sas_token is set, we will take user's input first.
221    /// - If not, we will try to load it from environment.
222    ///
223    /// See [Grant limited access to Azure Storage resources using shared access signatures (SAS)](https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview)
224    /// for more info.
225    pub fn sas_token(mut self, sas_token: &str) -> Self {
226        if !sas_token.is_empty() {
227            self.config.sas_token = Some(sas_token.to_string());
228        }
229
230        self
231    }
232
233    /// Specify the http client that used by this service.
234    ///
235    /// # Notes
236    ///
237    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
238    /// during minor updates.
239    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
240    #[allow(deprecated)]
241    pub fn http_client(mut self, client: HttpClient) -> Self {
242        self.http_client = Some(client);
243        self
244    }
245
246    /// Set maximum batch operations of this backend.
247    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
248        self.config.batch_max_operations = Some(batch_max_operations);
249
250        self
251    }
252
253    /// from_connection_string will make a builder from connection string
254    ///
255    /// connection string looks like:
256    ///
257    /// ```txt
258    /// DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
259    /// AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;
260    /// BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
261    /// QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
262    /// TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
263    /// ```
264    ///
265    /// Or
266    ///
267    /// ```txt
268    /// DefaultEndpointsProtocol=https;
269    /// AccountName=storagesample;
270    /// AccountKey=<account-key>;
271    /// EndpointSuffix=core.chinacloudapi.cn;
272    /// ```
273    ///
274    /// For reference: [Configure Azure Storage connection strings](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string)
275    ///
276    /// # Note
277    ///
278    /// Connection strings can only configure the endpoint, account name and
279    /// authentication information. Users still need to configure container name.
280    pub fn from_connection_string(conn: &str) -> Result<Self> {
281        let config =
282            raw::azure_config_from_connection_string(conn, raw::AzureStorageService::Blob)?;
283
284        Ok(AzblobConfig::from(config).into_builder())
285    }
286}
287
288impl Builder for AzblobBuilder {
289    type Config = AzblobConfig;
290
291    fn build(self) -> Result<impl Access> {
292        debug!("backend build started: {:?}", &self);
293
294        let root = normalize_root(&self.config.root.unwrap_or_default());
295        debug!("backend use root {root}");
296
297        // Handle endpoint, region and container name.
298        let container = match self.config.container.is_empty() {
299            false => Ok(&self.config.container),
300            true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
301                .with_operation("Builder::build")
302                .with_context("service", Scheme::Azblob)),
303        }?;
304        debug!("backend use container {}", &container);
305
306        let endpoint = match &self.config.endpoint {
307            Some(endpoint) => Ok(endpoint.clone()),
308            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
309                .with_operation("Builder::build")
310                .with_context("service", Scheme::Azblob)),
311        }?;
312        debug!("backend use endpoint {}", &container);
313
314        #[cfg(target_arch = "wasm32")]
315        let mut config_loader = AzureStorageConfig::default();
316        #[cfg(not(target_arch = "wasm32"))]
317        let mut config_loader = AzureStorageConfig::default().from_env();
318
319        if let Some(v) = self
320            .config
321            .account_name
322            .clone()
323            .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()))
324        {
325            config_loader.account_name = Some(v);
326        }
327
328        if let Some(v) = self.config.account_key.clone() {
329            // Validate that account_key can be decoded as base64
330            if let Err(e) = BASE64_STANDARD.decode(&v) {
331                return Err(Error::new(
332                    ErrorKind::ConfigInvalid,
333                    format!("invalid account_key: cannot decode as base64: {e}"),
334                )
335                .with_operation("Builder::build")
336                .with_context("service", Scheme::Azblob)
337                .with_context("key", "account_key"));
338            }
339            config_loader.account_key = Some(v);
340        }
341
342        if let Some(v) = self.config.sas_token.clone() {
343            config_loader.sas_token = Some(v);
344        }
345
346        let encryption_key =
347            match &self.config.encryption_key {
348                None => None,
349                Some(v) => Some(build_header_value(v).map_err(|err| {
350                    err.with_context("key", "server_side_encryption_customer_key")
351                })?),
352            };
353
354        let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
355            None => None,
356            Some(v) => Some(build_header_value(v).map_err(|err| {
357                err.with_context("key", "server_side_encryption_customer_key_sha256")
358            })?),
359        };
360
361        let encryption_algorithm = match &self.config.encryption_algorithm {
362            None => None,
363            Some(v) => {
364                if v == "AES256" {
365                    Some(build_header_value(v).map_err(|err| {
366                        err.with_context("key", "server_side_encryption_customer_algorithm")
367                    })?)
368                } else {
369                    return Err(Error::new(
370                        ErrorKind::ConfigInvalid,
371                        "encryption_algorithm value must be AES256",
372                    ));
373                }
374            }
375        };
376
377        let cred_loader = AzureStorageLoader::new(config_loader);
378
379        let signer = AzureStorageSigner::new();
380
381        Ok(AzblobBackend {
382            core: Arc::new(AzblobCore {
383                info: {
384                    let am = AccessorInfo::default();
385                    am.set_scheme(AZBLOB_SCHEME)
386                        .set_root(&root)
387                        .set_name(container)
388                        .set_native_capability(Capability {
389                            stat: true,
390                            stat_with_if_match: true,
391                            stat_with_if_none_match: true,
392
393                            read: true,
394
395                            read_with_if_match: true,
396                            read_with_if_none_match: true,
397                            read_with_override_content_disposition: true,
398                            read_with_if_modified_since: true,
399                            read_with_if_unmodified_since: true,
400
401                            write: true,
402                            write_can_append: true,
403                            write_can_empty: true,
404                            write_can_multi: true,
405                            write_with_cache_control: true,
406                            write_with_content_type: true,
407                            write_with_if_not_exists: true,
408                            write_with_if_none_match: true,
409                            write_with_user_metadata: true,
410
411                            delete: true,
412                            delete_max_size: Some(AZBLOB_BATCH_LIMIT),
413
414                            copy: true,
415                            copy_with_if_not_exists: true,
416
417                            list: true,
418                            list_with_recursive: true,
419
420                            presign: self.config.sas_token.is_some(),
421                            presign_stat: self.config.sas_token.is_some(),
422                            presign_read: self.config.sas_token.is_some(),
423                            presign_write: self.config.sas_token.is_some(),
424
425                            shared: true,
426
427                            ..Default::default()
428                        });
429
430                    // allow deprecated api here for compatibility
431                    #[allow(deprecated)]
432                    if let Some(client) = self.http_client {
433                        am.update_http_client(|_| client);
434                    }
435
436                    am.into()
437                },
438                root,
439                endpoint,
440                encryption_key,
441                encryption_key_sha256,
442                encryption_algorithm,
443                container: self.config.container.clone(),
444
445                loader: cred_loader,
446                signer,
447            }),
448        })
449    }
450}
451
452/// Backend for azblob services.
453#[derive(Debug, Clone)]
454pub struct AzblobBackend {
455    core: Arc<AzblobCore>,
456}
457
458impl Access for AzblobBackend {
459    type Reader = HttpBody;
460    type Writer = AzblobWriters;
461    type Lister = oio::PageLister<AzblobLister>;
462    type Deleter = oio::BatchDeleter<AzblobDeleter>;
463
464    fn info(&self) -> Arc<AccessorInfo> {
465        self.core.info.clone()
466    }
467
468    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
469        let resp = self.core.azblob_get_blob_properties(path, &args).await?;
470
471        let status = resp.status();
472
473        match status {
474            StatusCode::OK => {
475                let headers = resp.headers();
476                let mut meta = parse_into_metadata(path, headers)?;
477                if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
478                    meta.set_version(version_id);
479                }
480
481                let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
482                if !user_meta.is_empty() {
483                    meta = meta.with_user_metadata(user_meta);
484                }
485
486                Ok(RpStat::new(meta))
487            }
488            _ => Err(parse_error(resp)),
489        }
490    }
491
492    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
493        let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
494
495        let status = resp.status();
496        match status {
497            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
498            _ => {
499                let (part, mut body) = resp.into_parts();
500                let buf = body.to_buffer().await?;
501                Err(parse_error(Response::from_parts(part, buf)))
502            }
503        }
504    }
505
506    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
507        let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
508        let w = if args.append() {
509            AzblobWriters::Two(oio::AppendWriter::new(w))
510        } else {
511            AzblobWriters::One(oio::BlockWriter::new(
512                self.core.info.clone(),
513                w,
514                args.concurrent(),
515            ))
516        };
517
518        Ok((RpWrite::default(), w))
519    }
520
521    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
522        Ok((
523            RpDelete::default(),
524            oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
525        ))
526    }
527
528    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
529        let l = AzblobLister::new(
530            self.core.clone(),
531            path.to_string(),
532            args.recursive(),
533            args.limit(),
534        );
535
536        Ok((RpList::default(), oio::PageLister::new(l)))
537    }
538
539    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
540        let resp = self.core.azblob_copy_blob(from, to, args).await?;
541
542        let status = resp.status();
543
544        match status {
545            StatusCode::ACCEPTED => Ok(RpCopy::default()),
546            _ => Err(parse_error(resp)),
547        }
548    }
549
550    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
551        let req = match args.operation() {
552            PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
553            PresignOperation::Read(v) => {
554                self.core
555                    .azblob_get_blob_request(path, BytesRange::default(), v)
556            }
557            PresignOperation::Write(_) => {
558                self.core
559                    .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
560            }
561            PresignOperation::Delete(_) => Err(Error::new(
562                ErrorKind::Unsupported,
563                "operation is not supported",
564            )),
565        };
566
567        let mut req = req?;
568
569        self.core.sign_query(&mut req).await?;
570
571        let (parts, _) = req.into_parts();
572
573        Ok(RpPresign::new(PresignedRequest::new(
574            parts.method,
575            parts.uri,
576            parts.headers,
577        )))
578    }
579}