opendal/services/azblob/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use base64::prelude::BASE64_STANDARD;
23use base64::Engine;
24use http::Response;
25use http::StatusCode;
26use log::debug;
27use reqsign::AzureStorageConfig;
28use reqsign::AzureStorageLoader;
29use reqsign::AzureStorageSigner;
30use sha2::Digest;
31use sha2::Sha256;
32
33use super::core::constants::X_MS_META_PREFIX;
34use super::core::constants::X_MS_VERSION_ID;
35use super::core::AzblobCore;
36use super::delete::AzblobDeleter;
37use super::error::parse_error;
38use super::lister::AzblobLister;
39use super::writer::AzblobWriter;
40use super::writer::AzblobWriters;
41use crate::raw::*;
42use crate::services::AzblobConfig;
43use crate::*;
44
45const AZBLOB_BATCH_LIMIT: usize = 256;
46
47impl From<AzureStorageConfig> for AzblobConfig {
48    fn from(value: AzureStorageConfig) -> Self {
49        Self {
50            endpoint: value.endpoint,
51            account_name: value.account_name,
52            account_key: value.account_key,
53            sas_token: value.sas_token,
54            ..Default::default()
55        }
56    }
57}
58
59impl Configurator for AzblobConfig {
60    type Builder = AzblobBuilder;
61
62    #[allow(deprecated)]
63    fn into_builder(self) -> Self::Builder {
64        AzblobBuilder {
65            config: self,
66
67            http_client: None,
68        }
69    }
70}
71
72#[doc = include_str!("docs.md")]
73#[derive(Default, Clone)]
74pub struct AzblobBuilder {
75    config: AzblobConfig,
76
77    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
78    http_client: Option<HttpClient>,
79}
80
81impl Debug for AzblobBuilder {
82    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
83        let mut ds = f.debug_struct("AzblobBuilder");
84
85        ds.field("config", &self.config);
86
87        ds.finish()
88    }
89}
90
91impl AzblobBuilder {
92    /// Set root of this backend.
93    ///
94    /// All operations will happen under this root.
95    pub fn root(mut self, root: &str) -> Self {
96        self.config.root = if root.is_empty() {
97            None
98        } else {
99            Some(root.to_string())
100        };
101
102        self
103    }
104
105    /// Set container name of this backend.
106    pub fn container(mut self, container: &str) -> Self {
107        self.config.container = container.to_string();
108
109        self
110    }
111
112    /// Set endpoint of this backend
113    ///
114    /// Endpoint must be full uri, e.g.
115    ///
116    /// - Azblob: `https://accountname.blob.core.windows.net`
117    /// - Azurite: `http://127.0.0.1:10000/devstoreaccount1`
118    pub fn endpoint(mut self, endpoint: &str) -> Self {
119        if !endpoint.is_empty() {
120            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
121            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
122        }
123
124        self
125    }
126
127    /// Set account_name of this backend.
128    ///
129    /// - If account_name is set, we will take user's input first.
130    /// - If not, we will try to load it from environment.
131    pub fn account_name(mut self, account_name: &str) -> Self {
132        if !account_name.is_empty() {
133            self.config.account_name = Some(account_name.to_string());
134        }
135
136        self
137    }
138
139    /// Set account_key of this backend.
140    ///
141    /// - If account_key is set, we will take user's input first.
142    /// - If not, we will try to load it from environment.
143    pub fn account_key(mut self, account_key: &str) -> Self {
144        if !account_key.is_empty() {
145            self.config.account_key = Some(account_key.to_string());
146        }
147
148        self
149    }
150
151    /// Set encryption_key of this backend.
152    ///
153    /// # Args
154    ///
155    /// `v`: Base64-encoded key that matches algorithm specified in `encryption_algorithm`.
156    ///
157    /// # Note
158    ///
159    /// This function is the low-level setting for SSE related features.
160    ///
161    /// SSE related options should be set carefully to make them works.
162    /// Please use `server_side_encryption_with_*` helpers if even possible.
163    pub fn encryption_key(mut self, v: &str) -> Self {
164        if !v.is_empty() {
165            self.config.encryption_key = Some(v.to_string());
166        }
167
168        self
169    }
170
171    /// Set encryption_key_sha256 of this backend.
172    ///
173    /// # Args
174    ///
175    /// `v`: Base64-encoded SHA256 digest of the key specified in encryption_key.
176    ///
177    /// # Note
178    ///
179    /// This function is the low-level setting for SSE related features.
180    ///
181    /// SSE related options should be set carefully to make them works.
182    /// Please use `server_side_encryption_with_*` helpers if even possible.
183    pub fn encryption_key_sha256(mut self, v: &str) -> Self {
184        if !v.is_empty() {
185            self.config.encryption_key_sha256 = Some(v.to_string());
186        }
187
188        self
189    }
190
191    /// Set encryption_algorithm of this backend.
192    ///
193    /// # Args
194    ///
195    /// `v`: server-side encryption algorithm. (Available values: `AES256`)
196    ///
197    /// # Note
198    ///
199    /// This function is the low-level setting for SSE related features.
200    ///
201    /// SSE related options should be set carefully to make them works.
202    /// Please use `server_side_encryption_with_*` helpers if even possible.
203    pub fn encryption_algorithm(mut self, v: &str) -> Self {
204        if !v.is_empty() {
205            self.config.encryption_algorithm = Some(v.to_string());
206        }
207
208        self
209    }
210
211    /// Enable server side encryption with customer key.
212    ///
213    /// As known as: CPK
214    ///
215    /// # Args
216    ///
217    /// `key`: Base64-encoded SHA256 digest of the key specified in encryption_key.
218    ///
219    /// # Note
220    ///
221    /// Function that helps the user to set the server-side customer-provided encryption key, the key's SHA256, and the algorithm.
222    /// See [Server-side encryption with customer-provided keys (CPK)](https://learn.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys)
223    /// for more info.
224    pub fn server_side_encryption_with_customer_key(mut self, key: &[u8]) -> Self {
225        // Only AES256 is supported for now
226        self.config.encryption_algorithm = Some("AES256".to_string());
227        self.config.encryption_key = Some(BASE64_STANDARD.encode(key));
228        self.config.encryption_key_sha256 =
229            Some(BASE64_STANDARD.encode(Sha256::digest(key).as_slice()));
230        self
231    }
232
233    /// Set sas_token of this backend.
234    ///
235    /// - If sas_token is set, we will take user's input first.
236    /// - If not, we will try to load it from environment.
237    ///
238    /// See [Grant limited access to Azure Storage resources using shared access signatures (SAS)](https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview)
239    /// for more info.
240    pub fn sas_token(mut self, sas_token: &str) -> Self {
241        if !sas_token.is_empty() {
242            self.config.sas_token = Some(sas_token.to_string());
243        }
244
245        self
246    }
247
248    /// Specify the http client that used by this service.
249    ///
250    /// # Notes
251    ///
252    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
253    /// during minor updates.
254    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
255    #[allow(deprecated)]
256    pub fn http_client(mut self, client: HttpClient) -> Self {
257        self.http_client = Some(client);
258        self
259    }
260
261    /// Set maximum batch operations of this backend.
262    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
263        self.config.batch_max_operations = Some(batch_max_operations);
264
265        self
266    }
267
268    /// from_connection_string will make a builder from connection string
269    ///
270    /// connection string looks like:
271    ///
272    /// ```txt
273    /// DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
274    /// AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;
275    /// BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
276    /// QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
277    /// TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
278    /// ```
279    ///
280    /// Or
281    ///
282    /// ```txt
283    /// DefaultEndpointsProtocol=https;
284    /// AccountName=storagesample;
285    /// AccountKey=<account-key>;
286    /// EndpointSuffix=core.chinacloudapi.cn;
287    /// ```
288    ///
289    /// For reference: [Configure Azure Storage connection strings](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string)
290    ///
291    /// # Note
292    ///
293    /// Connection strings can only configure the endpoint, account name and
294    /// authentication information. Users still need to configure container name.
295    pub fn from_connection_string(conn: &str) -> Result<Self> {
296        let config =
297            raw::azure_config_from_connection_string(conn, raw::AzureStorageService::Blob)?;
298
299        Ok(AzblobConfig::from(config).into_builder())
300    }
301}
302
303impl Builder for AzblobBuilder {
304    const SCHEME: Scheme = Scheme::Azblob;
305    type Config = AzblobConfig;
306
307    fn build(self) -> Result<impl Access> {
308        debug!("backend build started: {:?}", &self);
309
310        let root = normalize_root(&self.config.root.unwrap_or_default());
311        debug!("backend use root {}", root);
312
313        // Handle endpoint, region and container name.
314        let container = match self.config.container.is_empty() {
315            false => Ok(&self.config.container),
316            true => Err(Error::new(ErrorKind::ConfigInvalid, "container is empty")
317                .with_operation("Builder::build")
318                .with_context("service", Scheme::Azblob)),
319        }?;
320        debug!("backend use container {}", &container);
321
322        let endpoint = match &self.config.endpoint {
323            Some(endpoint) => Ok(endpoint.clone()),
324            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
325                .with_operation("Builder::build")
326                .with_context("service", Scheme::Azblob)),
327        }?;
328        debug!("backend use endpoint {}", &container);
329
330        let mut config_loader = AzureStorageConfig::default().from_env();
331
332        if let Some(v) = self
333            .config
334            .account_name
335            .clone()
336            .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()))
337        {
338            config_loader.account_name = Some(v);
339        }
340
341        if let Some(v) = self.config.account_key.clone() {
342            config_loader.account_key = Some(v);
343        }
344
345        if let Some(v) = self.config.sas_token.clone() {
346            config_loader.sas_token = Some(v);
347        }
348
349        let encryption_key =
350            match &self.config.encryption_key {
351                None => None,
352                Some(v) => Some(build_header_value(v).map_err(|err| {
353                    err.with_context("key", "server_side_encryption_customer_key")
354                })?),
355            };
356
357        let encryption_key_sha256 = match &self.config.encryption_key_sha256 {
358            None => None,
359            Some(v) => Some(build_header_value(v).map_err(|err| {
360                err.with_context("key", "server_side_encryption_customer_key_sha256")
361            })?),
362        };
363
364        let encryption_algorithm = match &self.config.encryption_algorithm {
365            None => None,
366            Some(v) => {
367                if v == "AES256" {
368                    Some(build_header_value(v).map_err(|err| {
369                        err.with_context("key", "server_side_encryption_customer_algorithm")
370                    })?)
371                } else {
372                    return Err(Error::new(
373                        ErrorKind::ConfigInvalid,
374                        "encryption_algorithm value must be AES256",
375                    ));
376                }
377            }
378        };
379
380        let cred_loader = AzureStorageLoader::new(config_loader);
381
382        let signer = AzureStorageSigner::new();
383
384        Ok(AzblobBackend {
385            core: Arc::new(AzblobCore {
386                info: {
387                    let am = AccessorInfo::default();
388                    am.set_scheme(Scheme::Azblob)
389                        .set_root(&root)
390                        .set_name(container)
391                        .set_native_capability(Capability {
392                            stat: true,
393                            stat_with_if_match: true,
394                            stat_with_if_none_match: true,
395                            stat_has_cache_control: true,
396                            stat_has_content_length: true,
397                            stat_has_content_type: true,
398                            stat_has_content_encoding: true,
399                            stat_has_content_range: true,
400                            stat_has_etag: true,
401                            stat_has_content_md5: true,
402                            stat_has_last_modified: true,
403                            stat_has_content_disposition: true,
404
405                            read: true,
406
407                            read_with_if_match: true,
408                            read_with_if_none_match: true,
409                            read_with_override_content_disposition: true,
410                            read_with_if_modified_since: true,
411                            read_with_if_unmodified_since: true,
412
413                            write: true,
414                            write_can_append: true,
415                            write_can_empty: true,
416                            write_can_multi: true,
417                            write_with_cache_control: true,
418                            write_with_content_type: true,
419                            write_with_if_not_exists: true,
420                            write_with_if_none_match: true,
421                            write_with_user_metadata: true,
422
423                            delete: true,
424                            delete_max_size: Some(AZBLOB_BATCH_LIMIT),
425
426                            copy: true,
427
428                            list: true,
429                            list_with_recursive: true,
430                            list_has_etag: true,
431                            list_has_content_length: true,
432                            list_has_content_md5: true,
433                            list_has_content_type: true,
434                            list_has_last_modified: true,
435
436                            presign: self.config.sas_token.is_some(),
437                            presign_stat: self.config.sas_token.is_some(),
438                            presign_read: self.config.sas_token.is_some(),
439                            presign_write: self.config.sas_token.is_some(),
440
441                            shared: true,
442
443                            ..Default::default()
444                        });
445
446                    // allow deprecated api here for compatibility
447                    #[allow(deprecated)]
448                    if let Some(client) = self.http_client {
449                        am.update_http_client(|_| client);
450                    }
451
452                    am.into()
453                },
454                root,
455                endpoint,
456                encryption_key,
457                encryption_key_sha256,
458                encryption_algorithm,
459                container: self.config.container.clone(),
460
461                loader: cred_loader,
462                signer,
463            }),
464        })
465    }
466}
467
468/// Backend for azblob services.
469#[derive(Debug, Clone)]
470pub struct AzblobBackend {
471    core: Arc<AzblobCore>,
472}
473
474impl Access for AzblobBackend {
475    type Reader = HttpBody;
476    type Writer = AzblobWriters;
477    type Lister = oio::PageLister<AzblobLister>;
478    type Deleter = oio::BatchDeleter<AzblobDeleter>;
479
480    fn info(&self) -> Arc<AccessorInfo> {
481        self.core.info.clone()
482    }
483
484    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
485        let resp = self.core.azblob_get_blob_properties(path, &args).await?;
486
487        let status = resp.status();
488
489        match status {
490            StatusCode::OK => {
491                let headers = resp.headers();
492                let mut meta = parse_into_metadata(path, headers)?;
493                if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
494                    meta.set_version(version_id);
495                }
496
497                let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
498                if !user_meta.is_empty() {
499                    meta = meta.with_user_metadata(user_meta);
500                }
501
502                Ok(RpStat::new(meta))
503            }
504            _ => Err(parse_error(resp)),
505        }
506    }
507
508    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
509        let resp = self.core.azblob_get_blob(path, args.range(), &args).await?;
510
511        let status = resp.status();
512        match status {
513            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
514            _ => {
515                let (part, mut body) = resp.into_parts();
516                let buf = body.to_buffer().await?;
517                Err(parse_error(Response::from_parts(part, buf)))
518            }
519        }
520    }
521
522    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
523        let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string());
524        let w = if args.append() {
525            AzblobWriters::Two(oio::AppendWriter::new(w))
526        } else {
527            AzblobWriters::One(oio::BlockWriter::new(
528                self.core.info.clone(),
529                w,
530                args.concurrent(),
531            ))
532        };
533
534        Ok((RpWrite::default(), w))
535    }
536
537    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
538        Ok((
539            RpDelete::default(),
540            oio::BatchDeleter::new(AzblobDeleter::new(self.core.clone())),
541        ))
542    }
543
544    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
545        let l = AzblobLister::new(
546            self.core.clone(),
547            path.to_string(),
548            args.recursive(),
549            args.limit(),
550        );
551
552        Ok((RpList::default(), oio::PageLister::new(l)))
553    }
554
555    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
556        let resp = self.core.azblob_copy_blob(from, to).await?;
557
558        let status = resp.status();
559
560        match status {
561            StatusCode::ACCEPTED => Ok(RpCopy::default()),
562            _ => Err(parse_error(resp)),
563        }
564    }
565
566    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
567        let req = match args.operation() {
568            PresignOperation::Stat(v) => self.core.azblob_head_blob_request(path, v),
569            PresignOperation::Read(v) => {
570                self.core
571                    .azblob_get_blob_request(path, BytesRange::default(), v)
572            }
573            PresignOperation::Write(_) => {
574                self.core
575                    .azblob_put_blob_request(path, None, &OpWrite::default(), Buffer::new())
576            }
577            PresignOperation::Delete(_) => Err(Error::new(
578                ErrorKind::Unsupported,
579                "operation is not supported",
580            )),
581        };
582
583        let mut req = req?;
584
585        self.core.sign_query(&mut req).await?;
586
587        let (parts, _) = req.into_parts();
588
589        Ok(RpPresign::new(PresignedRequest::new(
590            parts.method,
591            parts.uri,
592            parts.headers,
593        )))
594    }
595}