opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Write;
21use std::str::FromStr;
22use std::sync::Arc;
23use std::sync::LazyLock;
24
25use base64::Engine;
26use base64::prelude::BASE64_STANDARD;
27use constants::X_AMZ_META_PREFIX;
28use constants::X_AMZ_VERSION_ID;
29use http::Response;
30use http::StatusCode;
31use log::debug;
32use log::warn;
33use md5::Digest;
34use md5::Md5;
35use reqsign_aws_v4::AssumeRoleCredentialProvider;
36use reqsign_aws_v4::Credential;
37use reqsign_aws_v4::DefaultCredentialProvider;
38use reqsign_aws_v4::RequestSigner as AwsV4Signer;
39use reqsign_aws_v4::StaticCredentialProvider;
40use reqsign_core::Context;
41use reqsign_core::OsEnv;
42use reqsign_core::ProvideCredentialChain;
43use reqsign_core::Signer;
44use reqsign_file_read_tokio::TokioFileRead;
45use reqsign_http_send_reqwest::ReqwestHttpSend;
46use reqwest::Url;
47
48use super::S3_SCHEME;
49use super::config::S3Config;
50use super::core::*;
51use super::deleter::S3Deleter;
52use super::error::parse_error;
53use super::lister::S3ListerV1;
54use super::lister::S3ListerV2;
55use super::lister::S3Listers;
56use super::lister::S3ObjectVersionsLister;
57use super::writer::S3Writer;
58use super::writer::S3Writers;
59use crate::raw::*;
60use crate::*;
61
62/// Allow constructing correct region endpoint if user gives a global endpoint.
63static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
64    let mut m = HashMap::new();
65    // AWS S3 Service.
66    m.insert(
67        "https://s3.amazonaws.com",
68        "https://s3.{region}.amazonaws.com",
69    );
70    m
71});
72
73const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
74
75/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
76/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
77#[doc = include_str!("docs.md")]
78#[doc = include_str!("compatible_services.md")]
79#[derive(Default)]
80pub struct S3Builder {
81    pub(super) config: S3Config,
82
83    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
84    pub(super) http_client: Option<HttpClient>,
85    pub(super) credential_providers: Option<ProvideCredentialChain<Credential>>,
86}
87
88impl Debug for S3Builder {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("S3Builder")
91            .field("config", &self.config)
92            .finish_non_exhaustive()
93    }
94}
95
96impl S3Builder {
97    /// Set root of this backend.
98    ///
99    /// All operations will happen under this root.
100    pub fn root(mut self, root: &str) -> Self {
101        self.config.root = if root.is_empty() {
102            None
103        } else {
104            Some(root.to_string())
105        };
106
107        self
108    }
109
110    /// Set bucket name of this backend.
111    pub fn bucket(mut self, bucket: &str) -> Self {
112        self.config.bucket = bucket.to_string();
113
114        self
115    }
116
117    /// Set endpoint of this backend.
118    ///
119    /// Endpoint must be full uri, e.g.
120    ///
121    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
122    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
123    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
124    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
125    /// - Minio: `http://127.0.0.1:9000`
126    ///
127    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
128    /// will prepend "https://" before it.
129    pub fn endpoint(mut self, endpoint: &str) -> Self {
130        if !endpoint.is_empty() {
131            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
132            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
133        }
134
135        self
136    }
137
138    /// Region represent the signing region of this endpoint. This is required
139    /// if you are using the default AWS S3 endpoint.
140    ///
141    /// If using a custom endpoint,
142    /// - If region is set, we will take user's input first.
143    /// - If not, we will try to load it from environment.
144    pub fn region(mut self, region: &str) -> Self {
145        if !region.is_empty() {
146            self.config.region = Some(region.to_string())
147        }
148
149        self
150    }
151
152    /// Set access_key_id of this backend.
153    ///
154    /// - If access_key_id is set, we will take user's input first.
155    /// - If not, we will try to load it from environment.
156    pub fn access_key_id(mut self, v: &str) -> Self {
157        if !v.is_empty() {
158            self.config.access_key_id = Some(v.to_string())
159        }
160
161        self
162    }
163
164    /// Set secret_access_key of this backend.
165    ///
166    /// - If secret_access_key is set, we will take user's input first.
167    /// - If not, we will try to load it from environment.
168    pub fn secret_access_key(mut self, v: &str) -> Self {
169        if !v.is_empty() {
170            self.config.secret_access_key = Some(v.to_string())
171        }
172
173        self
174    }
175
176    /// Set role_arn for this backend.
177    ///
178    /// If `role_arn` is set, we will use already known config as source
179    /// credential to assume role with `role_arn`.
180    pub fn role_arn(mut self, v: &str) -> Self {
181        if !v.is_empty() {
182            self.config.role_arn = Some(v.to_string())
183        }
184
185        self
186    }
187
188    /// Set external_id for this backend.
189    pub fn external_id(mut self, v: &str) -> Self {
190        if !v.is_empty() {
191            self.config.external_id = Some(v.to_string())
192        }
193
194        self
195    }
196
197    /// Set role_session_name for this backend.
198    pub fn role_session_name(mut self, v: &str) -> Self {
199        if !v.is_empty() {
200            self.config.role_session_name = Some(v.to_string())
201        }
202
203        self
204    }
205
206    /// Set default storage_class for this backend.
207    ///
208    /// Available values:
209    /// - `DEEP_ARCHIVE`
210    /// - `GLACIER`
211    /// - `GLACIER_IR`
212    /// - `INTELLIGENT_TIERING`
213    /// - `ONEZONE_IA`
214    /// - `OUTPOSTS`
215    /// - `REDUCED_REDUNDANCY`
216    /// - `STANDARD`
217    /// - `STANDARD_IA`
218    pub fn default_storage_class(mut self, v: &str) -> Self {
219        if !v.is_empty() {
220            self.config.default_storage_class = Some(v.to_string())
221        }
222
223        self
224    }
225
226    /// Set server_side_encryption for this backend.
227    ///
228    /// Available values: `AES256`, `aws:kms`.
229    ///
230    /// # Note
231    ///
232    /// This function is the low-level setting for SSE related features.
233    ///
234    /// SSE related options should be set carefully to make them works.
235    /// Please use `server_side_encryption_with_*` helpers if even possible.
236    pub fn server_side_encryption(mut self, v: &str) -> Self {
237        if !v.is_empty() {
238            self.config.server_side_encryption = Some(v.to_string())
239        }
240
241        self
242    }
243
244    /// Set server_side_encryption_aws_kms_key_id for this backend
245    ///
246    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
247    ///   is not set, S3 will use aws managed kms key to encrypt data.
248    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
249    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
250    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
251    ///   returned.
252    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
253    ///
254    /// # Note
255    ///
256    /// This function is the low-level setting for SSE related features.
257    ///
258    /// SSE related options should be set carefully to make them works.
259    /// Please use `server_side_encryption_with_*` helpers if even possible.
260    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
261        if !v.is_empty() {
262            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
263        }
264
265        self
266    }
267
268    /// Set server_side_encryption_customer_algorithm for this backend.
269    ///
270    /// Available values: `AES256`.
271    ///
272    /// # Note
273    ///
274    /// This function is the low-level setting for SSE related features.
275    ///
276    /// SSE related options should be set carefully to make them works.
277    /// Please use `server_side_encryption_with_*` helpers if even possible.
278    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
279        if !v.is_empty() {
280            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
281        }
282
283        self
284    }
285
286    /// Set server_side_encryption_customer_key for this backend.
287    ///
288    /// # Args
289    ///
290    /// `v`: base64 encoded key that matches algorithm specified in
291    /// `server_side_encryption_customer_algorithm`.
292    ///
293    /// # Note
294    ///
295    /// This function is the low-level setting for SSE related features.
296    ///
297    /// SSE related options should be set carefully to make them works.
298    /// Please use `server_side_encryption_with_*` helpers if even possible.
299    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
300        if !v.is_empty() {
301            self.config.server_side_encryption_customer_key = Some(v.to_string())
302        }
303
304        self
305    }
306
307    /// Set server_side_encryption_customer_key_md5 for this backend.
308    ///
309    /// # Args
310    ///
311    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
312    ///
313    /// # Note
314    ///
315    /// This function is the low-level setting for SSE related features.
316    ///
317    /// SSE related options should be set carefully to make them works.
318    /// Please use `server_side_encryption_with_*` helpers if even possible.
319    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
320        if !v.is_empty() {
321            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
322        }
323
324        self
325    }
326
327    /// Enable server side encryption with aws managed kms key
328    ///
329    /// As known as: SSE-KMS
330    ///
331    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
332    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
333        self.config.server_side_encryption = Some("aws:kms".to_string());
334        self
335    }
336
337    /// Enable server side encryption with customer managed kms key
338    ///
339    /// As known as: SSE-KMS
340    ///
341    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
342    pub fn server_side_encryption_with_customer_managed_kms_key(
343        mut self,
344        aws_kms_key_id: &str,
345    ) -> Self {
346        self.config.server_side_encryption = Some("aws:kms".to_string());
347        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
348        self
349    }
350
351    /// Enable server side encryption with s3 managed key
352    ///
353    /// As known as: SSE-S3
354    ///
355    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
356    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
357        self.config.server_side_encryption = Some("AES256".to_string());
358        self
359    }
360
361    /// Enable server side encryption with customer key.
362    ///
363    /// As known as: SSE-C
364    ///
365    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
366    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
367        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
368        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
369        let key_md5 = Md5::digest(key);
370        self.config.server_side_encryption_customer_key_md5 = Some(BASE64_STANDARD.encode(key_md5));
371        self
372    }
373
374    /// Set temporary credential used in AWS S3 connections
375    ///
376    /// # Warning
377    ///
378    /// session token's lifetime is short and requires users to refresh in time.
379    pub fn session_token(mut self, token: &str) -> Self {
380        if !token.is_empty() {
381            self.config.session_token = Some(token.to_string());
382        }
383        self
384    }
385
386    /// Disable config load so that opendal will not load config from
387    /// environment.
388    ///
389    /// For examples:
390    ///
391    /// - envs like `AWS_ACCESS_KEY_ID`
392    /// - files like `~/.aws/config`
393    pub fn disable_config_load(mut self) -> Self {
394        self.config.disable_config_load = true;
395        self
396    }
397
398    /// Disable list objects v2 so that opendal will not use the older
399    /// List Objects V1 to list objects.
400    ///
401    /// By default, OpenDAL uses List Objects V2 to list objects. However,
402    /// some legacy services do not yet support V2.
403    pub fn disable_list_objects_v2(mut self) -> Self {
404        self.config.disable_list_objects_v2 = true;
405        self
406    }
407
408    /// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
409    ///
410    /// With this option the client accepts to pay for the request and data transfer costs.
411    pub fn enable_request_payer(mut self) -> Self {
412        self.config.enable_request_payer = true;
413        self
414    }
415
416    /// Disable load credential from ec2 metadata.
417    ///
418    /// This option is used to disable the default behavior of opendal
419    /// to load credential from ec2 metadata, a.k.a, IMDSv2
420    pub fn disable_ec2_metadata(mut self) -> Self {
421        self.config.disable_ec2_metadata = true;
422        self
423    }
424
425    /// Allow anonymous will allow opendal to send request without signing
426    /// when credential is not loaded.
427    pub fn allow_anonymous(mut self) -> Self {
428        self.config.allow_anonymous = true;
429        self
430    }
431
432    /// Enable virtual host style so that opendal will send API requests
433    /// in virtual host style instead of path style.
434    ///
435    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
436    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
437    pub fn enable_virtual_host_style(mut self) -> Self {
438        self.config.enable_virtual_host_style = true;
439        self
440    }
441
442    /// Disable stat with override so that opendal will not send stat request with override queries.
443    ///
444    /// For example, R2 doesn't support stat with `response_content_type` query.
445    pub fn disable_stat_with_override(mut self) -> Self {
446        self.config.disable_stat_with_override = true;
447        self
448    }
449
450    /// Specify the http client that used by this service.
451    ///
452    /// # Notes
453    ///
454    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
455    /// during minor updates.
456    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
457    #[allow(deprecated)]
458    pub fn http_client(mut self, client: HttpClient) -> Self {
459        self.http_client = Some(client);
460        self
461    }
462
463    /// Set bucket versioning status for this backend
464    pub fn enable_versioning(mut self, enabled: bool) -> Self {
465        self.config.enable_versioning = enabled;
466
467        self
468    }
469
470    /// Replace the credential providers with a custom chain.
471    pub fn credential_provider_chain(mut self, chain: ProvideCredentialChain<Credential>) -> Self {
472        self.credential_providers = Some(chain);
473        self
474    }
475
476    /// Check if `bucket` is valid.
477    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
478    /// it could not contain dot (.) character.
479    fn is_bucket_valid(config: &S3Config) -> bool {
480        if config.bucket.is_empty() {
481            return false;
482        }
483        // If enable virtual host style, `bucket` will reside in domain part,
484        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
485        // so `bucket` with dot can't be recognized correctly for this format.
486        if config.enable_virtual_host_style && config.bucket.contains('.') {
487            return false;
488        }
489        true
490    }
491
492    /// Build endpoint with given region.
493    fn build_endpoint(config: &S3Config, region: &str) -> String {
494        let bucket = {
495            debug_assert!(Self::is_bucket_valid(config), "bucket must be valid");
496
497            config.bucket.as_str()
498        };
499
500        let mut endpoint = match &config.endpoint {
501            Some(endpoint) => {
502                if endpoint.starts_with("http") {
503                    endpoint.to_string()
504                } else {
505                    // Prefix https if endpoint doesn't start with scheme.
506                    format!("https://{endpoint}")
507                }
508            }
509            None => "https://s3.amazonaws.com".to_string(),
510        };
511
512        // If endpoint contains bucket name, we should trim them.
513        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
514
515        // Omit default ports if specified.
516        if let Ok(url) = Url::from_str(&endpoint) {
517            // Remove the trailing `/` of root path.
518            endpoint = url.to_string().trim_end_matches('/').to_string();
519        }
520
521        // Update with endpoint templates.
522        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
523            template.replace("{region}", region)
524        } else {
525            // If we don't know where about this endpoint, just leave
526            // them as it.
527            endpoint.to_string()
528        };
529
530        // Apply virtual host style.
531        if config.enable_virtual_host_style {
532            endpoint = endpoint.replace("//", &format!("//{bucket}."))
533        } else {
534            write!(endpoint, "/{bucket}").expect("write into string must succeed");
535        };
536
537        endpoint
538    }
539
540    /// Set maximum batch operations of this backend.
541    #[deprecated(
542        since = "0.52.0",
543        note = "Please use `delete_max_size` instead of `batch_max_operations`"
544    )]
545    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
546        self.config.delete_max_size = Some(batch_max_operations);
547
548        self
549    }
550
551    /// Set maximum delete operations of this backend.
552    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
553        self.config.delete_max_size = Some(delete_max_size);
554
555        self
556    }
557
558    /// Set checksum algorithm of this backend.
559    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
560    ///
561    /// Available options:
562    /// - "crc32c"
563    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
564        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
565
566        self
567    }
568
569    /// Disable write with if match so that opendal will not send write request with if match headers.
570    pub fn disable_write_with_if_match(mut self) -> Self {
571        self.config.disable_write_with_if_match = true;
572        self
573    }
574
575    /// Enable write with append so that opendal will send write request with append headers.
576    pub fn enable_write_with_append(mut self) -> Self {
577        self.config.enable_write_with_append = true;
578        self
579    }
580
581    /// Detect region of S3 bucket.
582    ///
583    /// # Args
584    ///
585    /// - endpoint: the endpoint of S3 service
586    /// - bucket: the bucket of S3 service
587    ///
588    /// # Return
589    ///
590    /// - `Some(region)` means we detect the region successfully
591    /// - `None` means we can't detect the region or meeting errors.
592    ///
593    /// # Notes
594    ///
595    /// We will try to detect region by the following methods.
596    ///
597    /// - Match endpoint with given rules to get region
598    ///   - Cloudflare R2
599    ///   - AWS S3
600    ///   - Aliyun OSS
601    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
602    ///
603    /// # Examples
604    ///
605    /// ```no_run
606    /// use opendal::services::S3;
607    ///
608    /// # async fn example() {
609    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
610    /// # }
611    /// ```
612    ///
613    /// # Reference
614    ///
615    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
616    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
617        // Remove the possible trailing `/` in endpoint.
618        let endpoint = endpoint.trim_end_matches('/');
619
620        // Make sure the endpoint contains the scheme.
621        let mut endpoint = if endpoint.starts_with("http") {
622            endpoint.to_string()
623        } else {
624            // Prefix https if endpoint doesn't start with scheme.
625            format!("https://{endpoint}")
626        };
627
628        // Remove bucket name from endpoint.
629        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
630        let url = format!("{endpoint}/{bucket}");
631
632        debug!("detect region with url: {url}");
633
634        // Try to detect region by endpoint.
635
636        // If this bucket is R2, we can return auto directly.
637        //
638        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
639        if endpoint.ends_with("r2.cloudflarestorage.com") {
640            return Some("auto".to_string());
641        }
642
643        // If this bucket is AWS, we can try to match the endpoint.
644        if let Some(v) = endpoint.strip_prefix("https://s3.") {
645            if let Some(region) = v.strip_suffix(".amazonaws.com") {
646                return Some(region.to_string());
647            }
648        }
649
650        // If this bucket is OSS, we can try to match the endpoint.
651        //
652        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
653        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
654        if let Some(v) = endpoint.strip_prefix("https://") {
655            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
656                return Some(region.to_string());
657            }
658
659            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
660                return Some(region.to_string());
661            }
662        }
663
664        // Try to detect region by HeadBucket.
665        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
666
667        let client = HttpClient::new().ok()?;
668        let res = client
669            .send(req)
670            .await
671            .map_err(|err| warn!("detect region failed for: {err:?}"))
672            .ok()?;
673
674        debug!(
675            "auto detect region got response: status {:?}, header: {:?}",
676            res.status(),
677            res.headers()
678        );
679
680        // Get region from response header no matter status code.
681        if let Some(header) = res.headers().get("x-amz-bucket-region") {
682            if let Ok(regin) = header.to_str() {
683                return Some(regin.to_string());
684            }
685        }
686
687        // Status code is 403 or 200 means we already visit the correct
688        // region, we can use the default region directly.
689        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
690            return Some("us-east-1".to_string());
691        }
692
693        None
694    }
695}
696
697impl Builder for S3Builder {
698    type Config = S3Config;
699
700    fn build(self) -> Result<impl Access> {
701        debug!("backend build started: {:?}", &self);
702
703        #[allow(deprecated)]
704        let S3Builder {
705            mut config,
706            http_client,
707            credential_providers,
708        } = self;
709
710        let root = normalize_root(&config.root.clone().unwrap_or_default());
711        debug!("backend use root {}", &root);
712
713        // Handle bucket name.
714        let bucket = if Self::is_bucket_valid(&config) {
715            Ok(&config.bucket)
716        } else {
717            Err(
718                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
719                    .with_context("service", S3_SCHEME),
720            )
721        }?;
722        debug!("backend use bucket {}", &bucket);
723
724        let default_storage_class = match &config.default_storage_class {
725            None => None,
726            Some(v) => Some(
727                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
728            ),
729        };
730
731        let server_side_encryption = match &config.server_side_encryption {
732            None => None,
733            Some(v) => Some(
734                build_header_value(v)
735                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
736            ),
737        };
738
739        let server_side_encryption_aws_kms_key_id =
740            match &config.server_side_encryption_aws_kms_key_id {
741                None => None,
742                Some(v) => Some(build_header_value(v).map_err(|err| {
743                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
744                })?),
745            };
746
747        let server_side_encryption_customer_algorithm =
748            match &config.server_side_encryption_customer_algorithm {
749                None => None,
750                Some(v) => Some(build_header_value(v).map_err(|err| {
751                    err.with_context("key", "server_side_encryption_customer_algorithm")
752                })?),
753            };
754
755        let server_side_encryption_customer_key =
756            match &config.server_side_encryption_customer_key {
757                None => None,
758                Some(v) => Some(build_header_value(v).map_err(|err| {
759                    err.with_context("key", "server_side_encryption_customer_key")
760                })?),
761            };
762
763        let server_side_encryption_customer_key_md5 =
764            match &config.server_side_encryption_customer_key_md5 {
765                None => None,
766                Some(v) => Some(build_header_value(v).map_err(|err| {
767                    err.with_context("key", "server_side_encryption_customer_key_md5")
768                })?),
769            };
770
771        let checksum_algorithm = match config.checksum_algorithm.as_deref() {
772            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
773            Some("md5") => Some(ChecksumAlgorithm::Md5),
774            None => None,
775            v => {
776                return Err(Error::new(
777                    ErrorKind::ConfigInvalid,
778                    format!("{v:?} is not a supported checksum_algorithm."),
779                ));
780            }
781        };
782
783        // Determine the region
784        let region = if let Some(ref v) = config.region {
785            v.to_string()
786        } else {
787            std::env::var("AWS_REGION")
788                .or_else(|_| std::env::var("AWS_DEFAULT_REGION"))
789                .map_err(|_| {
790                    Error::new(
791                        ErrorKind::ConfigInvalid,
792                        "region is missing. Please find it by S3::detect_region() or set them in env.",
793                    )
794                    .with_operation("Builder::build")
795                    .with_context("service", S3_SCHEME)
796                })?
797        };
798        debug!("backend use region: {region}");
799
800        if config.endpoint.is_none() && !config.disable_config_load {
801            let endpoint_from_env = std::env::var("AWS_ENDPOINT_URL")
802                .or_else(|_| std::env::var("AWS_ENDPOINT"))
803                .or_else(|_| std::env::var("AWS_S3_ENDPOINT"))
804                .ok();
805            if let Some(endpoint) = endpoint_from_env {
806                let normalized = endpoint.trim_end_matches('/').to_string();
807                config.endpoint = Some(normalized);
808            }
809        }
810
811        // Building endpoint.
812        let endpoint = Self::build_endpoint(&config, &region);
813        debug!("backend use endpoint: {endpoint}");
814
815        // Create the context for reqsign-core
816        let ctx = Context::new()
817            .with_file_read(TokioFileRead)
818            .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
819            .with_env(OsEnv);
820
821        let mut provider = {
822            let mut builder = DefaultCredentialProvider::builder();
823
824            if config.disable_config_load {
825                builder = builder.disable_env(true).disable_profile(true);
826            }
827
828            if config.disable_ec2_metadata {
829                builder = builder.disable_imds(true);
830            }
831
832            ProvideCredentialChain::new().push(builder.build())
833        };
834
835        // Insert static key if user provided.
836        if let (Some(ak), Some(sk)) = (&config.access_key_id, &config.secret_access_key) {
837            let static_provider = if let Some(token) = config.session_token.as_deref() {
838                StaticCredentialProvider::new(ak, sk).with_session_token(token)
839            } else {
840                StaticCredentialProvider::new(ak, sk)
841            };
842            provider = provider.push_front(static_provider);
843        }
844
845        // Insert assume role provider if user provided.
846        if let Some(role_arn) = &config.role_arn {
847            let sts_ctx = ctx.clone();
848            let sts_request_signer = AwsV4Signer::new("sts", &region);
849            let sts_signer = Signer::new(sts_ctx, provider, sts_request_signer);
850            let mut assume_role_provider =
851                AssumeRoleCredentialProvider::new(role_arn.clone(), sts_signer)
852                    .with_region(region.clone())
853                    .with_regional_sts_endpoint();
854
855            if let Some(external_id) = &config.external_id {
856                assume_role_provider = assume_role_provider.with_external_id(external_id.clone());
857            }
858            if let Some(role_session_name) = &config.role_session_name {
859                assume_role_provider =
860                    assume_role_provider.with_role_session_name(role_session_name.clone());
861            }
862            provider = ProvideCredentialChain::new().push(assume_role_provider);
863        }
864
865        // Replace provider if user provide their own.
866        let provider = if let Some(credential_providers) = credential_providers {
867            credential_providers
868        } else {
869            provider
870        };
871
872        // Create request signer for S3
873        let request_signer = AwsV4Signer::new("s3", &region);
874
875        // Create the signer
876        let signer = Signer::new(ctx, provider, request_signer);
877
878        let delete_max_size = config
879            .delete_max_size
880            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
881
882        Ok(S3Backend {
883            core: Arc::new(S3Core {
884                info: {
885                    let am = AccessorInfo::default();
886                    am.set_scheme(S3_SCHEME)
887                        .set_root(&root)
888                        .set_name(bucket)
889                        .set_native_capability(Capability {
890                            stat: true,
891                            stat_with_if_match: true,
892                            stat_with_if_none_match: true,
893                            stat_with_if_modified_since: true,
894                            stat_with_if_unmodified_since: true,
895                            stat_with_override_cache_control: !config.disable_stat_with_override,
896                            stat_with_override_content_disposition: !config
897                                .disable_stat_with_override,
898                            stat_with_override_content_type: !config.disable_stat_with_override,
899                            stat_with_version: config.enable_versioning,
900
901                            read: true,
902                            read_with_if_match: true,
903                            read_with_if_none_match: true,
904                            read_with_if_modified_since: true,
905                            read_with_if_unmodified_since: true,
906                            read_with_override_cache_control: true,
907                            read_with_override_content_disposition: true,
908                            read_with_override_content_type: true,
909                            read_with_version: config.enable_versioning,
910
911                            write: true,
912                            write_can_empty: true,
913                            write_can_multi: true,
914                            write_can_append: config.enable_write_with_append,
915
916                            write_with_cache_control: true,
917                            write_with_content_type: true,
918                            write_with_content_encoding: true,
919                            write_with_if_match: !config.disable_write_with_if_match,
920                            write_with_if_not_exists: true,
921                            write_with_user_metadata: true,
922
923                            // The min multipart size of S3 is 5 MiB.
924                            //
925                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
926                            write_multi_min_size: Some(5 * 1024 * 1024),
927                            // The max multipart size of S3 is 5 GiB.
928                            //
929                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
930                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
931                                Some(5 * 1024 * 1024 * 1024)
932                            } else {
933                                Some(usize::MAX)
934                            },
935
936                            delete: true,
937                            delete_max_size: Some(delete_max_size),
938                            delete_with_version: config.enable_versioning,
939
940                            copy: true,
941
942                            list: true,
943                            list_with_limit: true,
944                            list_with_start_after: true,
945                            list_with_recursive: true,
946                            list_with_versions: config.enable_versioning,
947                            list_with_deleted: config.enable_versioning,
948
949                            presign: true,
950                            presign_stat: true,
951                            presign_read: true,
952                            presign_write: true,
953
954                            shared: true,
955
956                            ..Default::default()
957                        });
958
959                    // allow deprecated api here for compatibility
960                    #[allow(deprecated)]
961                    if let Some(client) = http_client {
962                        am.update_http_client(|_| client);
963                    }
964
965                    am.into()
966                },
967                bucket: bucket.to_string(),
968                endpoint,
969                root,
970                server_side_encryption,
971                server_side_encryption_aws_kms_key_id,
972                server_side_encryption_customer_algorithm,
973                server_side_encryption_customer_key,
974                server_side_encryption_customer_key_md5,
975                default_storage_class,
976                allow_anonymous: config.allow_anonymous,
977                disable_list_objects_v2: config.disable_list_objects_v2,
978                enable_request_payer: config.enable_request_payer,
979                signer,
980                checksum_algorithm,
981            }),
982        })
983    }
984}
985
986/// Backend for s3 services.
987#[derive(Debug, Clone)]
988pub struct S3Backend {
989    core: Arc<S3Core>,
990}
991
992impl Access for S3Backend {
993    type Reader = HttpBody;
994    type Writer = S3Writers;
995    type Lister = S3Listers;
996    type Deleter = oio::BatchDeleter<S3Deleter>;
997
998    fn info(&self) -> Arc<AccessorInfo> {
999        self.core.info.clone()
1000    }
1001
1002    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1003        let resp = self.core.s3_head_object(path, args).await?;
1004
1005        let status = resp.status();
1006
1007        match status {
1008            StatusCode::OK => {
1009                let headers = resp.headers();
1010                let mut meta = parse_into_metadata(path, headers)?;
1011
1012                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1013                if !user_meta.is_empty() {
1014                    meta = meta.with_user_metadata(user_meta);
1015                }
1016
1017                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1018                    meta.set_version(v);
1019                }
1020
1021                Ok(RpStat::new(meta))
1022            }
1023            _ => Err(parse_error(resp)),
1024        }
1025    }
1026
1027    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1028        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1029
1030        let status = resp.status();
1031        match status {
1032            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1033                Ok((RpRead::default(), resp.into_body()))
1034            }
1035            _ => {
1036                let (part, mut body) = resp.into_parts();
1037                let buf = body.to_buffer().await?;
1038                Err(parse_error(Response::from_parts(part, buf)))
1039            }
1040        }
1041    }
1042
1043    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1044        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1045
1046        let w = if args.append() {
1047            S3Writers::Two(oio::AppendWriter::new(writer))
1048        } else {
1049            S3Writers::One(oio::MultipartWriter::new(
1050                self.core.info.clone(),
1051                writer,
1052                args.concurrent(),
1053            ))
1054        };
1055
1056        Ok((RpWrite::default(), w))
1057    }
1058
1059    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1060        Ok((
1061            RpDelete::default(),
1062            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1063        ))
1064    }
1065
1066    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1067        let l = if args.versions() || args.deleted() {
1068            ThreeWays::Three(oio::PageLister::new(S3ObjectVersionsLister::new(
1069                self.core.clone(),
1070                path,
1071                args,
1072            )))
1073        } else if self.core.disable_list_objects_v2 {
1074            ThreeWays::One(oio::PageLister::new(S3ListerV1::new(
1075                self.core.clone(),
1076                path,
1077                args,
1078            )))
1079        } else {
1080            ThreeWays::Two(oio::PageLister::new(S3ListerV2::new(
1081                self.core.clone(),
1082                path,
1083                args,
1084            )))
1085        };
1086
1087        Ok((RpList::default(), l))
1088    }
1089
1090    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1091        let resp = self.core.s3_copy_object(from, to).await?;
1092
1093        let status = resp.status();
1094
1095        match status {
1096            StatusCode::OK => Ok(RpCopy::default()),
1097            _ => Err(parse_error(resp)),
1098        }
1099    }
1100
1101    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1102        let (expire, op) = args.into_parts();
1103        // We will not send this request out, just for signing.
1104        let req = match op {
1105            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1106            PresignOperation::Read(v) => {
1107                self.core
1108                    .s3_get_object_request(path, BytesRange::default(), &v)
1109            }
1110            PresignOperation::Write(_) => {
1111                self.core
1112                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1113            }
1114            PresignOperation::Delete(_) => Err(Error::new(
1115                ErrorKind::Unsupported,
1116                "operation is not supported",
1117            )),
1118        };
1119        let req = req?;
1120
1121        let req = self.core.sign_query(req, expire).await?;
1122
1123        // We don't need this request anymore, consume it directly.
1124        let (parts, _) = req.into_parts();
1125
1126        Ok(RpPresign::new(PresignedRequest::new(
1127            parts.method,
1128            parts.uri,
1129            parts.headers,
1130        )))
1131    }
1132}
1133#[cfg(test)]
1134mod tests {
1135    use super::*;
1136
1137    #[test]
1138    fn test_is_valid_bucket() {
1139        let bucket_cases = vec![
1140            ("", false, false),
1141            ("test", false, true),
1142            ("test.xyz", false, true),
1143            ("", true, false),
1144            ("test", true, true),
1145            ("test.xyz", true, false),
1146        ];
1147
1148        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1149            let mut b = S3Builder::default();
1150            b = b.bucket(bucket);
1151            if enable_virtual_host_style {
1152                b = b.enable_virtual_host_style();
1153            }
1154            assert_eq!(S3Builder::is_bucket_valid(&b.config), expected)
1155        }
1156    }
1157
1158    #[test]
1159    fn test_build_endpoint() {
1160        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1161
1162        let endpoint_cases = vec![
1163            Some("s3.amazonaws.com"),
1164            Some("https://s3.amazonaws.com"),
1165            Some("https://s3.us-east-2.amazonaws.com"),
1166            None,
1167        ];
1168
1169        for endpoint in &endpoint_cases {
1170            let mut b = S3Builder::default().bucket("test");
1171            if let Some(endpoint) = endpoint {
1172                b = b.endpoint(endpoint);
1173            }
1174
1175            let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
1176            assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1177        }
1178
1179        for endpoint in &endpoint_cases {
1180            let mut b = S3Builder::default()
1181                .bucket("test")
1182                .enable_virtual_host_style();
1183            if let Some(endpoint) = endpoint {
1184                b = b.endpoint(endpoint);
1185            }
1186
1187            let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
1188            assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1189        }
1190    }
1191
1192    #[tokio::test]
1193    async fn test_detect_region() {
1194        let cases = vec![
1195            (
1196                "aws s3 without region in endpoint",
1197                "https://s3.amazonaws.com",
1198                "example",
1199                Some("us-east-1"),
1200            ),
1201            (
1202                "aws s3 with region in endpoint",
1203                "https://s3.us-east-1.amazonaws.com",
1204                "example",
1205                Some("us-east-1"),
1206            ),
1207            (
1208                "oss with public endpoint",
1209                "https://oss-ap-southeast-1.aliyuncs.com",
1210                "example",
1211                Some("oss-ap-southeast-1"),
1212            ),
1213            (
1214                "oss with internal endpoint",
1215                "https://oss-cn-hangzhou-internal.aliyuncs.com",
1216                "example",
1217                Some("oss-cn-hangzhou-internal"),
1218            ),
1219            (
1220                "r2",
1221                "https://abc.xxxxx.r2.cloudflarestorage.com",
1222                "example",
1223                Some("auto"),
1224            ),
1225            (
1226                "invalid service",
1227                "https://opendal.apache.org",
1228                "example",
1229                None,
1230            ),
1231        ];
1232
1233        for (name, endpoint, bucket, expected) in cases {
1234            let region = S3Builder::detect_region(endpoint, bucket).await;
1235            assert_eq!(region.as_deref(), expected, "{name}");
1236        }
1237    }
1238}