opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25
26use base64::prelude::BASE64_STANDARD;
27use base64::Engine;
28use constants::X_AMZ_META_PREFIX;
29use http::Response;
30use http::StatusCode;
31use log::debug;
32use log::warn;
33use md5::Digest;
34use md5::Md5;
35use reqsign::AwsAssumeRoleLoader;
36use reqsign::AwsConfig;
37use reqsign::AwsCredentialLoad;
38use reqsign::AwsDefaultLoader;
39use reqsign::AwsV4Signer;
40use reqwest::Url;
41use std::sync::LazyLock;
42
43use super::core::*;
44use super::delete::S3Deleter;
45use super::error::parse_error;
46use super::lister::{S3Lister, S3Listers, S3ObjectVersionsLister};
47use super::writer::S3Writer;
48use super::writer::S3Writers;
49use crate::raw::oio::PageLister;
50use crate::raw::*;
51use crate::services::S3Config;
52use crate::*;
53use constants::X_AMZ_VERSION_ID;
54
55/// Allow constructing correct region endpoint if user gives a global endpoint.
56static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
57    let mut m = HashMap::new();
58    // AWS S3 Service.
59    m.insert(
60        "https://s3.amazonaws.com",
61        "https://s3.{region}.amazonaws.com",
62    );
63    m
64});
65
66const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
67
68impl Configurator for S3Config {
69    type Builder = S3Builder;
70
71    #[allow(deprecated)]
72    fn into_builder(self) -> Self::Builder {
73        S3Builder {
74            config: self,
75            customized_credential_load: None,
76
77            http_client: None,
78        }
79    }
80}
81
82/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
83/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
84#[doc = include_str!("docs.md")]
85#[doc = include_str!("compatible_services.md")]
86#[derive(Default)]
87pub struct S3Builder {
88    config: S3Config,
89
90    customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
91
92    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
93    http_client: Option<HttpClient>,
94}
95
96impl Debug for S3Builder {
97    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98        let mut d = f.debug_struct("S3Builder");
99
100        d.field("config", &self.config);
101        d.finish_non_exhaustive()
102    }
103}
104
105impl S3Builder {
106    /// Set root of this backend.
107    ///
108    /// All operations will happen under this root.
109    pub fn root(mut self, root: &str) -> Self {
110        self.config.root = if root.is_empty() {
111            None
112        } else {
113            Some(root.to_string())
114        };
115
116        self
117    }
118
119    /// Set bucket name of this backend.
120    pub fn bucket(mut self, bucket: &str) -> Self {
121        self.config.bucket = bucket.to_string();
122
123        self
124    }
125
126    /// Set endpoint of this backend.
127    ///
128    /// Endpoint must be full uri, e.g.
129    ///
130    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
131    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
132    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
133    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
134    /// - Minio: `http://127.0.0.1:9000`
135    ///
136    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
137    /// will prepend "https://" before it.
138    pub fn endpoint(mut self, endpoint: &str) -> Self {
139        if !endpoint.is_empty() {
140            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
141            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
142        }
143
144        self
145    }
146
147    /// Region represent the signing region of this endpoint. This is required
148    /// if you are using the default AWS S3 endpoint.
149    ///
150    /// If using a custom endpoint,
151    /// - If region is set, we will take user's input first.
152    /// - If not, we will try to load it from environment.
153    pub fn region(mut self, region: &str) -> Self {
154        if !region.is_empty() {
155            self.config.region = Some(region.to_string())
156        }
157
158        self
159    }
160
161    /// Set access_key_id of this backend.
162    ///
163    /// - If access_key_id is set, we will take user's input first.
164    /// - If not, we will try to load it from environment.
165    pub fn access_key_id(mut self, v: &str) -> Self {
166        if !v.is_empty() {
167            self.config.access_key_id = Some(v.to_string())
168        }
169
170        self
171    }
172
173    /// Set secret_access_key of this backend.
174    ///
175    /// - If secret_access_key is set, we will take user's input first.
176    /// - If not, we will try to load it from environment.
177    pub fn secret_access_key(mut self, v: &str) -> Self {
178        if !v.is_empty() {
179            self.config.secret_access_key = Some(v.to_string())
180        }
181
182        self
183    }
184
185    /// Set role_arn for this backend.
186    ///
187    /// If `role_arn` is set, we will use already known config as source
188    /// credential to assume role with `role_arn`.
189    pub fn role_arn(mut self, v: &str) -> Self {
190        if !v.is_empty() {
191            self.config.role_arn = Some(v.to_string())
192        }
193
194        self
195    }
196
197    /// Set external_id for this backend.
198    pub fn external_id(mut self, v: &str) -> Self {
199        if !v.is_empty() {
200            self.config.external_id = Some(v.to_string())
201        }
202
203        self
204    }
205
206    /// Set role_session_name for this backend.
207    pub fn role_session_name(mut self, v: &str) -> Self {
208        if !v.is_empty() {
209            self.config.role_session_name = Some(v.to_string())
210        }
211
212        self
213    }
214
215    /// Set default storage_class for this backend.
216    ///
217    /// Available values:
218    /// - `DEEP_ARCHIVE`
219    /// - `GLACIER`
220    /// - `GLACIER_IR`
221    /// - `INTELLIGENT_TIERING`
222    /// - `ONEZONE_IA`
223    /// - `OUTPOSTS`
224    /// - `REDUCED_REDUNDANCY`
225    /// - `STANDARD`
226    /// - `STANDARD_IA`
227    pub fn default_storage_class(mut self, v: &str) -> Self {
228        if !v.is_empty() {
229            self.config.default_storage_class = Some(v.to_string())
230        }
231
232        self
233    }
234
235    /// Set server_side_encryption for this backend.
236    ///
237    /// Available values: `AES256`, `aws:kms`.
238    ///
239    /// # Note
240    ///
241    /// This function is the low-level setting for SSE related features.
242    ///
243    /// SSE related options should be set carefully to make them works.
244    /// Please use `server_side_encryption_with_*` helpers if even possible.
245    pub fn server_side_encryption(mut self, v: &str) -> Self {
246        if !v.is_empty() {
247            self.config.server_side_encryption = Some(v.to_string())
248        }
249
250        self
251    }
252
253    /// Set server_side_encryption_aws_kms_key_id for this backend
254    ///
255    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
256    ///   is not set, S3 will use aws managed kms key to encrypt data.
257    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
258    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
259    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
260    ///   returned.
261    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
262    ///
263    /// # Note
264    ///
265    /// This function is the low-level setting for SSE related features.
266    ///
267    /// SSE related options should be set carefully to make them works.
268    /// Please use `server_side_encryption_with_*` helpers if even possible.
269    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
270        if !v.is_empty() {
271            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
272        }
273
274        self
275    }
276
277    /// Set server_side_encryption_customer_algorithm for this backend.
278    ///
279    /// Available values: `AES256`.
280    ///
281    /// # Note
282    ///
283    /// This function is the low-level setting for SSE related features.
284    ///
285    /// SSE related options should be set carefully to make them works.
286    /// Please use `server_side_encryption_with_*` helpers if even possible.
287    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
288        if !v.is_empty() {
289            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
290        }
291
292        self
293    }
294
295    /// Set server_side_encryption_customer_key for this backend.
296    ///
297    /// # Args
298    ///
299    /// `v`: base64 encoded key that matches algorithm specified in
300    /// `server_side_encryption_customer_algorithm`.
301    ///
302    /// # Note
303    ///
304    /// This function is the low-level setting for SSE related features.
305    ///
306    /// SSE related options should be set carefully to make them works.
307    /// Please use `server_side_encryption_with_*` helpers if even possible.
308    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
309        if !v.is_empty() {
310            self.config.server_side_encryption_customer_key = Some(v.to_string())
311        }
312
313        self
314    }
315
316    /// Set server_side_encryption_customer_key_md5 for this backend.
317    ///
318    /// # Args
319    ///
320    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
321    ///
322    /// # Note
323    ///
324    /// This function is the low-level setting for SSE related features.
325    ///
326    /// SSE related options should be set carefully to make them works.
327    /// Please use `server_side_encryption_with_*` helpers if even possible.
328    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
329        if !v.is_empty() {
330            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
331        }
332
333        self
334    }
335
336    /// Enable server side encryption with aws managed kms key
337    ///
338    /// As known as: SSE-KMS
339    ///
340    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
341    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
342        self.config.server_side_encryption = Some("aws:kms".to_string());
343        self
344    }
345
346    /// Enable server side encryption with customer managed kms key
347    ///
348    /// As known as: SSE-KMS
349    ///
350    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
351    pub fn server_side_encryption_with_customer_managed_kms_key(
352        mut self,
353        aws_kms_key_id: &str,
354    ) -> Self {
355        self.config.server_side_encryption = Some("aws:kms".to_string());
356        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
357        self
358    }
359
360    /// Enable server side encryption with s3 managed key
361    ///
362    /// As known as: SSE-S3
363    ///
364    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
365    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
366        self.config.server_side_encryption = Some("AES256".to_string());
367        self
368    }
369
370    /// Enable server side encryption with customer key.
371    ///
372    /// As known as: SSE-C
373    ///
374    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
375    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
376        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
377        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
378        self.config.server_side_encryption_customer_key_md5 =
379            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
380        self
381    }
382
383    /// Set temporary credential used in AWS S3 connections
384    ///
385    /// # Warning
386    ///
387    /// session token's lifetime is short and requires users to refresh in time.
388    pub fn session_token(mut self, token: &str) -> Self {
389        if !token.is_empty() {
390            self.config.session_token = Some(token.to_string());
391        }
392        self
393    }
394
395    /// Set temporary credential used in AWS S3 connections
396    #[deprecated(note = "Please use `session_token` instead")]
397    pub fn security_token(self, token: &str) -> Self {
398        self.session_token(token)
399    }
400
401    /// Disable config load so that opendal will not load config from
402    /// environment.
403    ///
404    /// For examples:
405    ///
406    /// - envs like `AWS_ACCESS_KEY_ID`
407    /// - files like `~/.aws/config`
408    pub fn disable_config_load(mut self) -> Self {
409        self.config.disable_config_load = true;
410        self
411    }
412
413    /// Disable load credential from ec2 metadata.
414    ///
415    /// This option is used to disable the default behavior of opendal
416    /// to load credential from ec2 metadata, a.k.a, IMDSv2
417    pub fn disable_ec2_metadata(mut self) -> Self {
418        self.config.disable_ec2_metadata = true;
419        self
420    }
421
422    /// Allow anonymous will allow opendal to send request without signing
423    /// when credential is not loaded.
424    pub fn allow_anonymous(mut self) -> Self {
425        self.config.allow_anonymous = true;
426        self
427    }
428
429    /// Enable virtual host style so that opendal will send API requests
430    /// in virtual host style instead of path style.
431    ///
432    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
433    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
434    pub fn enable_virtual_host_style(mut self) -> Self {
435        self.config.enable_virtual_host_style = true;
436        self
437    }
438
439    /// Disable stat with override so that opendal will not send stat request with override queries.
440    ///
441    /// For example, R2 doesn't support stat with `response_content_type` query.
442    pub fn disable_stat_with_override(mut self) -> Self {
443        self.config.disable_stat_with_override = true;
444        self
445    }
446
447    /// Adding a customized credential load for service.
448    ///
449    /// If customized_credential_load has been set, we will ignore all other
450    /// credential load methods.
451    pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
452        self.customized_credential_load = Some(cred);
453        self
454    }
455
456    /// Specify the http client that used by this service.
457    ///
458    /// # Notes
459    ///
460    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
461    /// during minor updates.
462    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
463    #[allow(deprecated)]
464    pub fn http_client(mut self, client: HttpClient) -> Self {
465        self.http_client = Some(client);
466        self
467    }
468
469    /// Set bucket versioning status for this backend
470    pub fn enable_versioning(mut self, enabled: bool) -> Self {
471        self.config.enable_versioning = enabled;
472
473        self
474    }
475
476    /// Check if `bucket` is valid
477    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
478    /// it couldn't contain dot(.) character
479    fn is_bucket_valid(&self) -> bool {
480        if self.config.bucket.is_empty() {
481            return false;
482        }
483        // If enable virtual host style, `bucket` will reside in domain part,
484        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
485        // so `bucket` with dot can't be recognized correctly for this format.
486        if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
487            return false;
488        }
489        true
490    }
491
492    /// Build endpoint with given region.
493    fn build_endpoint(&self, region: &str) -> String {
494        let bucket = {
495            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
496
497            self.config.bucket.as_str()
498        };
499
500        let mut endpoint = match &self.config.endpoint {
501            Some(endpoint) => {
502                if endpoint.starts_with("http") {
503                    endpoint.to_string()
504                } else {
505                    // Prefix https if endpoint doesn't start with scheme.
506                    format!("https://{endpoint}")
507                }
508            }
509            None => "https://s3.amazonaws.com".to_string(),
510        };
511
512        // If endpoint contains bucket name, we should trim them.
513        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
514
515        // Omit default ports if specified.
516        if let Ok(url) = Url::from_str(&endpoint) {
517            // Remove the trailing `/` of root path.
518            endpoint = url.to_string().trim_end_matches('/').to_string();
519        }
520
521        // Update with endpoint templates.
522        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
523            template.replace("{region}", region)
524        } else {
525            // If we don't know where about this endpoint, just leave
526            // them as it.
527            endpoint.to_string()
528        };
529
530        // Apply virtual host style.
531        if self.config.enable_virtual_host_style {
532            endpoint = endpoint.replace("//", &format!("//{bucket}."))
533        } else {
534            write!(endpoint, "/{bucket}").expect("write into string must succeed");
535        };
536
537        endpoint
538    }
539
540    /// Set maximum batch operations of this backend.
541    #[deprecated(
542        since = "0.52.0",
543        note = "Please use `delete_max_size` instead of `batch_max_operations`"
544    )]
545    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
546        self.config.delete_max_size = Some(batch_max_operations);
547
548        self
549    }
550
551    /// Set maximum delete operations of this backend.
552    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
553        self.config.delete_max_size = Some(delete_max_size);
554
555        self
556    }
557
558    /// Set checksum algorithm of this backend.
559    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
560    ///
561    /// Available options:
562    /// - "crc32c"
563    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
564        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
565
566        self
567    }
568
569    /// Disable write with if match so that opendal will not send write request with if match headers.
570    pub fn disable_write_with_if_match(mut self) -> Self {
571        self.config.disable_write_with_if_match = true;
572        self
573    }
574
575    /// Enable write with append so that opendal will send write request with append headers.
576    pub fn enable_write_with_append(mut self) -> Self {
577        self.config.enable_write_with_append = true;
578        self
579    }
580
581    /// Detect region of S3 bucket.
582    ///
583    /// # Args
584    ///
585    /// - endpoint: the endpoint of S3 service
586    /// - bucket: the bucket of S3 service
587    ///
588    /// # Return
589    ///
590    /// - `Some(region)` means we detect the region successfully
591    /// - `None` means we can't detect the region or meeting errors.
592    ///
593    /// # Notes
594    ///
595    /// We will try to detect region by the following methods.
596    ///
597    /// - Match endpoint with given rules to get region
598    ///   - Cloudflare R2
599    ///   - AWS S3
600    ///   - Aliyun OSS
601    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
602    ///
603    /// # Examples
604    ///
605    /// ```no_run
606    /// use opendal::services::S3;
607    ///
608    /// # async fn example() {
609    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
610    /// # }
611    /// ```
612    ///
613    /// # Reference
614    ///
615    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
616    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
617        // Remove the possible trailing `/` in endpoint.
618        let endpoint = endpoint.trim_end_matches('/');
619
620        // Make sure the endpoint contains the scheme.
621        let mut endpoint = if endpoint.starts_with("http") {
622            endpoint.to_string()
623        } else {
624            // Prefix https if endpoint doesn't start with scheme.
625            format!("https://{}", endpoint)
626        };
627
628        // Remove bucket name from endpoint.
629        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
630        let url = format!("{endpoint}/{bucket}");
631
632        debug!("detect region with url: {url}");
633
634        // Try to detect region by endpoint.
635
636        // If this bucket is R2, we can return auto directly.
637        //
638        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
639        if endpoint.ends_with("r2.cloudflarestorage.com") {
640            return Some("auto".to_string());
641        }
642
643        // If this bucket is AWS, we can try to match the endpoint.
644        if let Some(v) = endpoint.strip_prefix("https://s3.") {
645            if let Some(region) = v.strip_suffix(".amazonaws.com") {
646                return Some(region.to_string());
647            }
648        }
649
650        // If this bucket is OSS, we can try to match the endpoint.
651        //
652        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
653        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
654        if let Some(v) = endpoint.strip_prefix("https://") {
655            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
656                return Some(region.to_string());
657            }
658
659            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
660                return Some(region.to_string());
661            }
662        }
663
664        // Try to detect region by HeadBucket.
665        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
666
667        let client = HttpClient::new().ok()?;
668        let res = client
669            .send(req)
670            .await
671            .map_err(|err| warn!("detect region failed for: {err:?}"))
672            .ok()?;
673
674        debug!(
675            "auto detect region got response: status {:?}, header: {:?}",
676            res.status(),
677            res.headers()
678        );
679
680        // Get region from response header no matter status code.
681        if let Some(header) = res.headers().get("x-amz-bucket-region") {
682            if let Ok(regin) = header.to_str() {
683                return Some(regin.to_string());
684            }
685        }
686
687        // Status code is 403 or 200 means we already visit the correct
688        // region, we can use the default region directly.
689        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
690            return Some("us-east-1".to_string());
691        }
692
693        None
694    }
695}
696
697impl Builder for S3Builder {
698    const SCHEME: Scheme = Scheme::S3;
699    type Config = S3Config;
700
701    fn build(mut self) -> Result<impl Access> {
702        debug!("backend build started: {:?}", &self);
703
704        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
705        debug!("backend use root {}", &root);
706
707        // Handle bucket name.
708        let bucket = if self.is_bucket_valid() {
709            Ok(&self.config.bucket)
710        } else {
711            Err(
712                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
713                    .with_context("service", Scheme::S3),
714            )
715        }?;
716        debug!("backend use bucket {}", &bucket);
717
718        let default_storage_class = match &self.config.default_storage_class {
719            None => None,
720            Some(v) => Some(
721                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
722            ),
723        };
724
725        let server_side_encryption = match &self.config.server_side_encryption {
726            None => None,
727            Some(v) => Some(
728                build_header_value(v)
729                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
730            ),
731        };
732
733        let server_side_encryption_aws_kms_key_id =
734            match &self.config.server_side_encryption_aws_kms_key_id {
735                None => None,
736                Some(v) => Some(build_header_value(v).map_err(|err| {
737                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
738                })?),
739            };
740
741        let server_side_encryption_customer_algorithm =
742            match &self.config.server_side_encryption_customer_algorithm {
743                None => None,
744                Some(v) => Some(build_header_value(v).map_err(|err| {
745                    err.with_context("key", "server_side_encryption_customer_algorithm")
746                })?),
747            };
748
749        let server_side_encryption_customer_key =
750            match &self.config.server_side_encryption_customer_key {
751                None => None,
752                Some(v) => Some(build_header_value(v).map_err(|err| {
753                    err.with_context("key", "server_side_encryption_customer_key")
754                })?),
755            };
756
757        let server_side_encryption_customer_key_md5 =
758            match &self.config.server_side_encryption_customer_key_md5 {
759                None => None,
760                Some(v) => Some(build_header_value(v).map_err(|err| {
761                    err.with_context("key", "server_side_encryption_customer_key_md5")
762                })?),
763            };
764
765        let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
766            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
767            None => None,
768            v => {
769                return Err(Error::new(
770                    ErrorKind::ConfigInvalid,
771                    format!("{:?} is not a supported checksum_algorithm.", v),
772                ))
773            }
774        };
775
776        // This is our current config.
777        let mut cfg = AwsConfig::default();
778        if !self.config.disable_config_load {
779            #[cfg(not(target_arch = "wasm32"))]
780            {
781                cfg = cfg.from_profile();
782                cfg = cfg.from_env();
783            }
784        }
785
786        if let Some(ref v) = self.config.region {
787            cfg.region = Some(v.to_string());
788        }
789
790        if cfg.region.is_none() {
791            return Err(Error::new(
792                ErrorKind::ConfigInvalid,
793                "region is missing. Please find it by S3::detect_region() or set them in env.",
794            )
795            .with_operation("Builder::build")
796            .with_context("service", Scheme::S3));
797        }
798
799        let region = cfg.region.to_owned().unwrap();
800        debug!("backend use region: {region}");
801
802        // Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
803        self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
804
805        // Building endpoint.
806        let endpoint = self.build_endpoint(&region);
807        debug!("backend use endpoint: {endpoint}");
808
809        // Setting all value from user input if available.
810        if let Some(v) = self.config.access_key_id {
811            cfg.access_key_id = Some(v)
812        }
813        if let Some(v) = self.config.secret_access_key {
814            cfg.secret_access_key = Some(v)
815        }
816        if let Some(v) = self.config.session_token {
817            cfg.session_token = Some(v)
818        }
819
820        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
821        // If customized_credential_load is set, we will use it.
822        if let Some(v) = self.customized_credential_load {
823            loader = Some(v);
824        }
825
826        // If role_arn is set, we must use AssumeRoleLoad.
827        if let Some(role_arn) = self.config.role_arn {
828            // use current env as source credential loader.
829            let default_loader =
830                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
831
832            // Build the config for assume role.
833            let mut assume_role_cfg = AwsConfig {
834                region: Some(region.clone()),
835                role_arn: Some(role_arn),
836                external_id: self.config.external_id.clone(),
837                sts_regional_endpoints: "regional".to_string(),
838                ..Default::default()
839            };
840
841            // override default role_session_name if set
842            if let Some(name) = self.config.role_session_name {
843                assume_role_cfg.role_session_name = name;
844            }
845
846            let assume_role_loader = AwsAssumeRoleLoader::new(
847                GLOBAL_REQWEST_CLIENT.clone().clone(),
848                assume_role_cfg,
849                Box::new(default_loader),
850            )
851            .map_err(|err| {
852                Error::new(
853                    ErrorKind::ConfigInvalid,
854                    "The assume_role_loader is misconfigured",
855                )
856                .with_context("service", Scheme::S3)
857                .set_source(err)
858            })?;
859            loader = Some(Box::new(assume_role_loader));
860        }
861        // If loader is not set, we will use default loader.
862        let loader = match loader {
863            Some(v) => v,
864            None => {
865                let mut default_loader =
866                    AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
867                if self.config.disable_ec2_metadata {
868                    default_loader = default_loader.with_disable_ec2_metadata();
869                }
870
871                Box::new(default_loader)
872            }
873        };
874
875        let signer = AwsV4Signer::new("s3", &region);
876
877        let delete_max_size = self
878            .config
879            .delete_max_size
880            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
881
882        Ok(S3Backend {
883            core: Arc::new(S3Core {
884                info: {
885                    let am = AccessorInfo::default();
886                    am.set_scheme(Scheme::S3)
887                        .set_root(&root)
888                        .set_name(bucket)
889                        .set_native_capability(Capability {
890                            stat: true,
891                            stat_has_content_encoding: true,
892                            stat_with_if_match: true,
893                            stat_with_if_none_match: true,
894                            stat_with_if_modified_since: true,
895                            stat_with_if_unmodified_since: true,
896                            stat_with_override_cache_control: !self
897                                .config
898                                .disable_stat_with_override,
899                            stat_with_override_content_disposition: !self
900                                .config
901                                .disable_stat_with_override,
902                            stat_with_override_content_type: !self
903                                .config
904                                .disable_stat_with_override,
905                            stat_with_version: self.config.enable_versioning,
906                            stat_has_cache_control: true,
907                            stat_has_content_length: true,
908                            stat_has_content_type: true,
909                            stat_has_content_range: true,
910                            stat_has_etag: true,
911                            stat_has_content_md5: true,
912                            stat_has_last_modified: true,
913                            stat_has_content_disposition: true,
914                            stat_has_user_metadata: true,
915                            stat_has_version: true,
916
917                            read: true,
918                            read_with_if_match: true,
919                            read_with_if_none_match: true,
920                            read_with_if_modified_since: true,
921                            read_with_if_unmodified_since: true,
922                            read_with_override_cache_control: true,
923                            read_with_override_content_disposition: true,
924                            read_with_override_content_type: true,
925                            read_with_version: self.config.enable_versioning,
926
927                            write: true,
928                            write_can_empty: true,
929                            write_can_multi: true,
930                            write_can_append: self.config.enable_write_with_append,
931
932                            write_with_cache_control: true,
933                            write_with_content_type: true,
934                            write_with_content_encoding: true,
935                            write_with_if_match: !self.config.disable_write_with_if_match,
936                            write_with_if_not_exists: true,
937                            write_with_user_metadata: true,
938
939                            // The min multipart size of S3 is 5 MiB.
940                            //
941                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
942                            write_multi_min_size: Some(5 * 1024 * 1024),
943                            // The max multipart size of S3 is 5 GiB.
944                            //
945                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
946                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
947                                Some(5 * 1024 * 1024 * 1024)
948                            } else {
949                                Some(usize::MAX)
950                            },
951
952                            delete: true,
953                            delete_max_size: Some(delete_max_size),
954                            delete_with_version: self.config.enable_versioning,
955
956                            copy: true,
957
958                            list: true,
959                            list_with_limit: true,
960                            list_with_start_after: true,
961                            list_with_recursive: true,
962                            list_with_versions: self.config.enable_versioning,
963                            list_with_deleted: self.config.enable_versioning,
964                            list_has_etag: true,
965                            list_has_content_md5: true,
966                            list_has_content_length: true,
967                            list_has_last_modified: true,
968
969                            presign: true,
970                            presign_stat: true,
971                            presign_read: true,
972                            presign_write: true,
973
974                            shared: true,
975
976                            ..Default::default()
977                        });
978
979                    // allow deprecated api here for compatibility
980                    #[allow(deprecated)]
981                    if let Some(client) = self.http_client {
982                        am.update_http_client(|_| client);
983                    }
984
985                    am.into()
986                },
987                bucket: bucket.to_string(),
988                endpoint,
989                root,
990                server_side_encryption,
991                server_side_encryption_aws_kms_key_id,
992                server_side_encryption_customer_algorithm,
993                server_side_encryption_customer_key,
994                server_side_encryption_customer_key_md5,
995                default_storage_class,
996                allow_anonymous: self.config.allow_anonymous,
997                signer,
998                loader,
999                credential_loaded: AtomicBool::new(false),
1000                checksum_algorithm,
1001            }),
1002        })
1003    }
1004}
1005
1006/// Backend for s3 services.
1007#[derive(Debug, Clone)]
1008pub struct S3Backend {
1009    core: Arc<S3Core>,
1010}
1011
1012impl Access for S3Backend {
1013    type Reader = HttpBody;
1014    type Writer = S3Writers;
1015    type Lister = S3Listers;
1016    type Deleter = oio::BatchDeleter<S3Deleter>;
1017    type BlockingReader = ();
1018    type BlockingWriter = ();
1019    type BlockingLister = ();
1020    type BlockingDeleter = ();
1021
1022    fn info(&self) -> Arc<AccessorInfo> {
1023        self.core.info.clone()
1024    }
1025
1026    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1027        let resp = self.core.s3_head_object(path, args).await?;
1028
1029        let status = resp.status();
1030
1031        match status {
1032            StatusCode::OK => {
1033                let headers = resp.headers();
1034                let mut meta = parse_into_metadata(path, headers)?;
1035
1036                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1037                if !user_meta.is_empty() {
1038                    meta.with_user_metadata(user_meta);
1039                }
1040
1041                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1042                    meta.set_version(v);
1043                }
1044
1045                Ok(RpStat::new(meta))
1046            }
1047            _ => Err(parse_error(resp)),
1048        }
1049    }
1050
1051    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1052        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1053
1054        let status = resp.status();
1055        match status {
1056            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1057                Ok((RpRead::default(), resp.into_body()))
1058            }
1059            _ => {
1060                let (part, mut body) = resp.into_parts();
1061                let buf = body.to_buffer().await?;
1062                Err(parse_error(Response::from_parts(part, buf)))
1063            }
1064        }
1065    }
1066
1067    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1068        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1069
1070        let w = if args.append() {
1071            S3Writers::Two(oio::AppendWriter::new(writer))
1072        } else {
1073            S3Writers::One(oio::MultipartWriter::new(
1074                self.core.info.clone(),
1075                writer,
1076                args.concurrent(),
1077            ))
1078        };
1079
1080        Ok((RpWrite::default(), w))
1081    }
1082
1083    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1084        Ok((
1085            RpDelete::default(),
1086            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1087        ))
1088    }
1089
1090    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1091        let l = if args.versions() || args.deleted() {
1092            TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
1093                self.core.clone(),
1094                path,
1095                args,
1096            )))
1097        } else {
1098            TwoWays::One(PageLister::new(S3Lister::new(
1099                self.core.clone(),
1100                path,
1101                args,
1102            )))
1103        };
1104
1105        Ok((RpList::default(), l))
1106    }
1107
1108    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1109        let resp = self.core.s3_copy_object(from, to).await?;
1110
1111        let status = resp.status();
1112
1113        match status {
1114            StatusCode::OK => Ok(RpCopy::default()),
1115            _ => Err(parse_error(resp)),
1116        }
1117    }
1118
1119    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1120        let (expire, op) = args.into_parts();
1121        // We will not send this request out, just for signing.
1122        let req = match op {
1123            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1124            PresignOperation::Read(v) => {
1125                self.core
1126                    .s3_get_object_request(path, BytesRange::default(), &v)
1127            }
1128            PresignOperation::Write(_) => {
1129                self.core
1130                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1131            }
1132            PresignOperation::Delete(_) => Err(Error::new(
1133                ErrorKind::Unsupported,
1134                "operation is not supported",
1135            )),
1136        };
1137        let mut req = req?;
1138
1139        self.core.sign_query(&mut req, expire).await?;
1140
1141        // We don't need this request anymore, consume it directly.
1142        let (parts, _) = req.into_parts();
1143
1144        Ok(RpPresign::new(PresignedRequest::new(
1145            parts.method,
1146            parts.uri,
1147            parts.headers,
1148        )))
1149    }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::*;
1155
1156    #[test]
1157    fn test_is_valid_bucket() {
1158        let bucket_cases = vec![
1159            ("", false, false),
1160            ("test", false, true),
1161            ("test.xyz", false, true),
1162            ("", true, false),
1163            ("test", true, true),
1164            ("test.xyz", true, false),
1165        ];
1166
1167        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1168            let mut b = S3Builder::default();
1169            b = b.bucket(bucket);
1170            if enable_virtual_host_style {
1171                b = b.enable_virtual_host_style();
1172            }
1173            assert_eq!(b.is_bucket_valid(), expected)
1174        }
1175    }
1176
1177    #[test]
1178    fn test_build_endpoint() {
1179        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1180
1181        let endpoint_cases = vec![
1182            Some("s3.amazonaws.com"),
1183            Some("https://s3.amazonaws.com"),
1184            Some("https://s3.us-east-2.amazonaws.com"),
1185            None,
1186        ];
1187
1188        for endpoint in &endpoint_cases {
1189            let mut b = S3Builder::default().bucket("test");
1190            if let Some(endpoint) = endpoint {
1191                b = b.endpoint(endpoint);
1192            }
1193
1194            let endpoint = b.build_endpoint("us-east-2");
1195            assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1196        }
1197
1198        for endpoint in &endpoint_cases {
1199            let mut b = S3Builder::default()
1200                .bucket("test")
1201                .enable_virtual_host_style();
1202            if let Some(endpoint) = endpoint {
1203                b = b.endpoint(endpoint);
1204            }
1205
1206            let endpoint = b.build_endpoint("us-east-2");
1207            assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1208        }
1209    }
1210
1211    #[tokio::test]
1212    async fn test_detect_region() {
1213        let cases = vec![
1214            (
1215                "aws s3 without region in endpoint",
1216                "https://s3.amazonaws.com",
1217                "example",
1218                Some("us-east-1"),
1219            ),
1220            (
1221                "aws s3 with region in endpoint",
1222                "https://s3.us-east-1.amazonaws.com",
1223                "example",
1224                Some("us-east-1"),
1225            ),
1226            (
1227                "oss with public endpoint",
1228                "https://oss-ap-southeast-1.aliyuncs.com",
1229                "example",
1230                Some("oss-ap-southeast-1"),
1231            ),
1232            (
1233                "oss with internal endpoint",
1234                "https://oss-cn-hangzhou-internal.aliyuncs.com",
1235                "example",
1236                Some("oss-cn-hangzhou-internal"),
1237            ),
1238            (
1239                "r2",
1240                "https://abc.xxxxx.r2.cloudflarestorage.com",
1241                "example",
1242                Some("auto"),
1243            ),
1244            (
1245                "invalid service",
1246                "https://opendal.apache.org",
1247                "example",
1248                None,
1249            ),
1250        ];
1251
1252        for (name, endpoint, bucket, expected) in cases {
1253            let region = S3Builder::detect_region(endpoint, bucket).await;
1254            assert_eq!(region.as_deref(), expected, "{}", name);
1255        }
1256    }
1257}