opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::sync::LazyLock;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use constants::X_AMZ_META_PREFIX;
30use constants::X_AMZ_VERSION_ID;
31use http::Response;
32use http::StatusCode;
33use log::debug;
34use log::warn;
35use md5::Digest;
36use md5::Md5;
37use reqsign::AwsAssumeRoleLoader;
38use reqsign::AwsConfig;
39use reqsign::AwsCredentialLoad;
40use reqsign::AwsDefaultLoader;
41use reqsign::AwsV4Signer;
42use reqwest::Url;
43
44use super::core::*;
45use super::delete::S3Deleter;
46use super::error::parse_error;
47use super::lister::S3ListerV1;
48use super::lister::S3ListerV2;
49use super::lister::S3Listers;
50use super::lister::S3ObjectVersionsLister;
51use super::writer::S3Writer;
52use super::writer::S3Writers;
53use crate::raw::oio::PageLister;
54use crate::raw::*;
55use crate::services::S3Config;
56use crate::*;
57
58/// Allow constructing correct region endpoint if user gives a global endpoint.
59static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
60    let mut m = HashMap::new();
61    // AWS S3 Service.
62    m.insert(
63        "https://s3.amazonaws.com",
64        "https://s3.{region}.amazonaws.com",
65    );
66    m
67});
68
69const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
70
71impl Configurator for S3Config {
72    type Builder = S3Builder;
73
74    #[allow(deprecated)]
75    fn into_builder(self) -> Self::Builder {
76        S3Builder {
77            config: self,
78            customized_credential_load: None,
79
80            http_client: None,
81        }
82    }
83}
84
85/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
86/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
87#[doc = include_str!("docs.md")]
88#[doc = include_str!("compatible_services.md")]
89#[derive(Default)]
90pub struct S3Builder {
91    config: S3Config,
92
93    customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
94
95    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
96    http_client: Option<HttpClient>,
97}
98
99impl Debug for S3Builder {
100    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
101        let mut d = f.debug_struct("S3Builder");
102
103        d.field("config", &self.config);
104        d.finish_non_exhaustive()
105    }
106}
107
108impl S3Builder {
109    /// Set root of this backend.
110    ///
111    /// All operations will happen under this root.
112    pub fn root(mut self, root: &str) -> Self {
113        self.config.root = if root.is_empty() {
114            None
115        } else {
116            Some(root.to_string())
117        };
118
119        self
120    }
121
122    /// Set bucket name of this backend.
123    pub fn bucket(mut self, bucket: &str) -> Self {
124        self.config.bucket = bucket.to_string();
125
126        self
127    }
128
129    /// Set endpoint of this backend.
130    ///
131    /// Endpoint must be full uri, e.g.
132    ///
133    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
134    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
135    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
136    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
137    /// - Minio: `http://127.0.0.1:9000`
138    ///
139    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
140    /// will prepend "https://" before it.
141    pub fn endpoint(mut self, endpoint: &str) -> Self {
142        if !endpoint.is_empty() {
143            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
144            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
145        }
146
147        self
148    }
149
150    /// Region represent the signing region of this endpoint. This is required
151    /// if you are using the default AWS S3 endpoint.
152    ///
153    /// If using a custom endpoint,
154    /// - If region is set, we will take user's input first.
155    /// - If not, we will try to load it from environment.
156    pub fn region(mut self, region: &str) -> Self {
157        if !region.is_empty() {
158            self.config.region = Some(region.to_string())
159        }
160
161        self
162    }
163
164    /// Set access_key_id of this backend.
165    ///
166    /// - If access_key_id is set, we will take user's input first.
167    /// - If not, we will try to load it from environment.
168    pub fn access_key_id(mut self, v: &str) -> Self {
169        if !v.is_empty() {
170            self.config.access_key_id = Some(v.to_string())
171        }
172
173        self
174    }
175
176    /// Set secret_access_key of this backend.
177    ///
178    /// - If secret_access_key is set, we will take user's input first.
179    /// - If not, we will try to load it from environment.
180    pub fn secret_access_key(mut self, v: &str) -> Self {
181        if !v.is_empty() {
182            self.config.secret_access_key = Some(v.to_string())
183        }
184
185        self
186    }
187
188    /// Set role_arn for this backend.
189    ///
190    /// If `role_arn` is set, we will use already known config as source
191    /// credential to assume role with `role_arn`.
192    pub fn role_arn(mut self, v: &str) -> Self {
193        if !v.is_empty() {
194            self.config.role_arn = Some(v.to_string())
195        }
196
197        self
198    }
199
200    /// Set external_id for this backend.
201    pub fn external_id(mut self, v: &str) -> Self {
202        if !v.is_empty() {
203            self.config.external_id = Some(v.to_string())
204        }
205
206        self
207    }
208
209    /// Set role_session_name for this backend.
210    pub fn role_session_name(mut self, v: &str) -> Self {
211        if !v.is_empty() {
212            self.config.role_session_name = Some(v.to_string())
213        }
214
215        self
216    }
217
218    /// Set default storage_class for this backend.
219    ///
220    /// Available values:
221    /// - `DEEP_ARCHIVE`
222    /// - `GLACIER`
223    /// - `GLACIER_IR`
224    /// - `INTELLIGENT_TIERING`
225    /// - `ONEZONE_IA`
226    /// - `OUTPOSTS`
227    /// - `REDUCED_REDUNDANCY`
228    /// - `STANDARD`
229    /// - `STANDARD_IA`
230    pub fn default_storage_class(mut self, v: &str) -> Self {
231        if !v.is_empty() {
232            self.config.default_storage_class = Some(v.to_string())
233        }
234
235        self
236    }
237
238    /// Set server_side_encryption for this backend.
239    ///
240    /// Available values: `AES256`, `aws:kms`.
241    ///
242    /// # Note
243    ///
244    /// This function is the low-level setting for SSE related features.
245    ///
246    /// SSE related options should be set carefully to make them works.
247    /// Please use `server_side_encryption_with_*` helpers if even possible.
248    pub fn server_side_encryption(mut self, v: &str) -> Self {
249        if !v.is_empty() {
250            self.config.server_side_encryption = Some(v.to_string())
251        }
252
253        self
254    }
255
256    /// Set server_side_encryption_aws_kms_key_id for this backend
257    ///
258    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
259    ///   is not set, S3 will use aws managed kms key to encrypt data.
260    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
261    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
262    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
263    ///   returned.
264    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
265    ///
266    /// # Note
267    ///
268    /// This function is the low-level setting for SSE related features.
269    ///
270    /// SSE related options should be set carefully to make them works.
271    /// Please use `server_side_encryption_with_*` helpers if even possible.
272    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
273        if !v.is_empty() {
274            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
275        }
276
277        self
278    }
279
280    /// Set server_side_encryption_customer_algorithm for this backend.
281    ///
282    /// Available values: `AES256`.
283    ///
284    /// # Note
285    ///
286    /// This function is the low-level setting for SSE related features.
287    ///
288    /// SSE related options should be set carefully to make them works.
289    /// Please use `server_side_encryption_with_*` helpers if even possible.
290    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
291        if !v.is_empty() {
292            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
293        }
294
295        self
296    }
297
298    /// Set server_side_encryption_customer_key for this backend.
299    ///
300    /// # Args
301    ///
302    /// `v`: base64 encoded key that matches algorithm specified in
303    /// `server_side_encryption_customer_algorithm`.
304    ///
305    /// # Note
306    ///
307    /// This function is the low-level setting for SSE related features.
308    ///
309    /// SSE related options should be set carefully to make them works.
310    /// Please use `server_side_encryption_with_*` helpers if even possible.
311    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
312        if !v.is_empty() {
313            self.config.server_side_encryption_customer_key = Some(v.to_string())
314        }
315
316        self
317    }
318
319    /// Set server_side_encryption_customer_key_md5 for this backend.
320    ///
321    /// # Args
322    ///
323    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
324    ///
325    /// # Note
326    ///
327    /// This function is the low-level setting for SSE related features.
328    ///
329    /// SSE related options should be set carefully to make them works.
330    /// Please use `server_side_encryption_with_*` helpers if even possible.
331    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
332        if !v.is_empty() {
333            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
334        }
335
336        self
337    }
338
339    /// Enable server side encryption with aws managed kms key
340    ///
341    /// As known as: SSE-KMS
342    ///
343    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
344    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
345        self.config.server_side_encryption = Some("aws:kms".to_string());
346        self
347    }
348
349    /// Enable server side encryption with customer managed kms key
350    ///
351    /// As known as: SSE-KMS
352    ///
353    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
354    pub fn server_side_encryption_with_customer_managed_kms_key(
355        mut self,
356        aws_kms_key_id: &str,
357    ) -> Self {
358        self.config.server_side_encryption = Some("aws:kms".to_string());
359        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
360        self
361    }
362
363    /// Enable server side encryption with s3 managed key
364    ///
365    /// As known as: SSE-S3
366    ///
367    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
368    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
369        self.config.server_side_encryption = Some("AES256".to_string());
370        self
371    }
372
373    /// Enable server side encryption with customer key.
374    ///
375    /// As known as: SSE-C
376    ///
377    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
378    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
379        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
380        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
381        self.config.server_side_encryption_customer_key_md5 =
382            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
383        self
384    }
385
386    /// Set temporary credential used in AWS S3 connections
387    ///
388    /// # Warning
389    ///
390    /// session token's lifetime is short and requires users to refresh in time.
391    pub fn session_token(mut self, token: &str) -> Self {
392        if !token.is_empty() {
393            self.config.session_token = Some(token.to_string());
394        }
395        self
396    }
397
398    /// Set temporary credential used in AWS S3 connections
399    #[deprecated(note = "Please use `session_token` instead")]
400    pub fn security_token(self, token: &str) -> Self {
401        self.session_token(token)
402    }
403
404    /// Disable config load so that opendal will not load config from
405    /// environment.
406    ///
407    /// For examples:
408    ///
409    /// - envs like `AWS_ACCESS_KEY_ID`
410    /// - files like `~/.aws/config`
411    pub fn disable_config_load(mut self) -> Self {
412        self.config.disable_config_load = true;
413        self
414    }
415
416    /// Disable list objects v2 so that opendal will not use the older
417    /// List Objects V1 to list objects.
418    ///
419    /// By default, OpenDAL uses List Objects V2 to list objects. However,
420    /// some legacy services do not yet support V2.
421    pub fn disable_list_objects_v2(mut self) -> Self {
422        self.config.disable_list_objects_v2 = true;
423        self
424    }
425
426    /// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
427    ///
428    /// With this option the client accepts to pay for the request and data transfer costs.
429    pub fn enable_request_payer(mut self) -> Self {
430        self.config.enable_request_payer = true;
431        self
432    }
433
434    /// Disable load credential from ec2 metadata.
435    ///
436    /// This option is used to disable the default behavior of opendal
437    /// to load credential from ec2 metadata, a.k.a, IMDSv2
438    pub fn disable_ec2_metadata(mut self) -> Self {
439        self.config.disable_ec2_metadata = true;
440        self
441    }
442
443    /// Allow anonymous will allow opendal to send request without signing
444    /// when credential is not loaded.
445    pub fn allow_anonymous(mut self) -> Self {
446        self.config.allow_anonymous = true;
447        self
448    }
449
450    /// Enable virtual host style so that opendal will send API requests
451    /// in virtual host style instead of path style.
452    ///
453    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
454    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
455    pub fn enable_virtual_host_style(mut self) -> Self {
456        self.config.enable_virtual_host_style = true;
457        self
458    }
459
460    /// Disable stat with override so that opendal will not send stat request with override queries.
461    ///
462    /// For example, R2 doesn't support stat with `response_content_type` query.
463    pub fn disable_stat_with_override(mut self) -> Self {
464        self.config.disable_stat_with_override = true;
465        self
466    }
467
468    /// Adding a customized credential load for service.
469    ///
470    /// If customized_credential_load has been set, we will ignore all other
471    /// credential load methods.
472    pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
473        self.customized_credential_load = Some(cred);
474        self
475    }
476
477    /// Specify the http client that used by this service.
478    ///
479    /// # Notes
480    ///
481    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
482    /// during minor updates.
483    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
484    #[allow(deprecated)]
485    pub fn http_client(mut self, client: HttpClient) -> Self {
486        self.http_client = Some(client);
487        self
488    }
489
490    /// Set bucket versioning status for this backend
491    pub fn enable_versioning(mut self, enabled: bool) -> Self {
492        self.config.enable_versioning = enabled;
493
494        self
495    }
496
497    /// Check if `bucket` is valid
498    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
499    /// it couldn't contain dot(.) character
500    fn is_bucket_valid(&self) -> bool {
501        if self.config.bucket.is_empty() {
502            return false;
503        }
504        // If enable virtual host style, `bucket` will reside in domain part,
505        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
506        // so `bucket` with dot can't be recognized correctly for this format.
507        if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
508            return false;
509        }
510        true
511    }
512
513    /// Build endpoint with given region.
514    fn build_endpoint(&self, region: &str) -> String {
515        let bucket = {
516            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
517
518            self.config.bucket.as_str()
519        };
520
521        let mut endpoint = match &self.config.endpoint {
522            Some(endpoint) => {
523                if endpoint.starts_with("http") {
524                    endpoint.to_string()
525                } else {
526                    // Prefix https if endpoint doesn't start with scheme.
527                    format!("https://{endpoint}")
528                }
529            }
530            None => "https://s3.amazonaws.com".to_string(),
531        };
532
533        // If endpoint contains bucket name, we should trim them.
534        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
535
536        // Omit default ports if specified.
537        if let Ok(url) = Url::from_str(&endpoint) {
538            // Remove the trailing `/` of root path.
539            endpoint = url.to_string().trim_end_matches('/').to_string();
540        }
541
542        // Update with endpoint templates.
543        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
544            template.replace("{region}", region)
545        } else {
546            // If we don't know where about this endpoint, just leave
547            // them as it.
548            endpoint.to_string()
549        };
550
551        // Apply virtual host style.
552        if self.config.enable_virtual_host_style {
553            endpoint = endpoint.replace("//", &format!("//{bucket}."))
554        } else {
555            write!(endpoint, "/{bucket}").expect("write into string must succeed");
556        };
557
558        endpoint
559    }
560
561    /// Set maximum batch operations of this backend.
562    #[deprecated(
563        since = "0.52.0",
564        note = "Please use `delete_max_size` instead of `batch_max_operations`"
565    )]
566    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
567        self.config.delete_max_size = Some(batch_max_operations);
568
569        self
570    }
571
572    /// Set maximum delete operations of this backend.
573    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
574        self.config.delete_max_size = Some(delete_max_size);
575
576        self
577    }
578
579    /// Set checksum algorithm of this backend.
580    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
581    ///
582    /// Available options:
583    /// - "crc32c"
584    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
585        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
586
587        self
588    }
589
590    /// Disable write with if match so that opendal will not send write request with if match headers.
591    pub fn disable_write_with_if_match(mut self) -> Self {
592        self.config.disable_write_with_if_match = true;
593        self
594    }
595
596    /// Enable write with append so that opendal will send write request with append headers.
597    pub fn enable_write_with_append(mut self) -> Self {
598        self.config.enable_write_with_append = true;
599        self
600    }
601
602    /// Detect region of S3 bucket.
603    ///
604    /// # Args
605    ///
606    /// - endpoint: the endpoint of S3 service
607    /// - bucket: the bucket of S3 service
608    ///
609    /// # Return
610    ///
611    /// - `Some(region)` means we detect the region successfully
612    /// - `None` means we can't detect the region or meeting errors.
613    ///
614    /// # Notes
615    ///
616    /// We will try to detect region by the following methods.
617    ///
618    /// - Match endpoint with given rules to get region
619    ///   - Cloudflare R2
620    ///   - AWS S3
621    ///   - Aliyun OSS
622    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
623    ///
624    /// # Examples
625    ///
626    /// ```no_run
627    /// use opendal::services::S3;
628    ///
629    /// # async fn example() {
630    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
631    /// # }
632    /// ```
633    ///
634    /// # Reference
635    ///
636    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
637    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
638        // Remove the possible trailing `/` in endpoint.
639        let endpoint = endpoint.trim_end_matches('/');
640
641        // Make sure the endpoint contains the scheme.
642        let mut endpoint = if endpoint.starts_with("http") {
643            endpoint.to_string()
644        } else {
645            // Prefix https if endpoint doesn't start with scheme.
646            format!("https://{}", endpoint)
647        };
648
649        // Remove bucket name from endpoint.
650        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
651        let url = format!("{endpoint}/{bucket}");
652
653        debug!("detect region with url: {url}");
654
655        // Try to detect region by endpoint.
656
657        // If this bucket is R2, we can return auto directly.
658        //
659        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
660        if endpoint.ends_with("r2.cloudflarestorage.com") {
661            return Some("auto".to_string());
662        }
663
664        // If this bucket is AWS, we can try to match the endpoint.
665        if let Some(v) = endpoint.strip_prefix("https://s3.") {
666            if let Some(region) = v.strip_suffix(".amazonaws.com") {
667                return Some(region.to_string());
668            }
669        }
670
671        // If this bucket is OSS, we can try to match the endpoint.
672        //
673        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
674        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
675        if let Some(v) = endpoint.strip_prefix("https://") {
676            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
677                return Some(region.to_string());
678            }
679
680            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
681                return Some(region.to_string());
682            }
683        }
684
685        // Try to detect region by HeadBucket.
686        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
687
688        let client = HttpClient::new().ok()?;
689        let res = client
690            .send(req)
691            .await
692            .map_err(|err| warn!("detect region failed for: {err:?}"))
693            .ok()?;
694
695        debug!(
696            "auto detect region got response: status {:?}, header: {:?}",
697            res.status(),
698            res.headers()
699        );
700
701        // Get region from response header no matter status code.
702        if let Some(header) = res.headers().get("x-amz-bucket-region") {
703            if let Ok(regin) = header.to_str() {
704                return Some(regin.to_string());
705            }
706        }
707
708        // Status code is 403 or 200 means we already visit the correct
709        // region, we can use the default region directly.
710        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
711            return Some("us-east-1".to_string());
712        }
713
714        None
715    }
716}
717
718impl Builder for S3Builder {
719    const SCHEME: Scheme = Scheme::S3;
720    type Config = S3Config;
721
722    fn build(mut self) -> Result<impl Access> {
723        debug!("backend build started: {:?}", &self);
724
725        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
726        debug!("backend use root {}", &root);
727
728        // Handle bucket name.
729        let bucket = if self.is_bucket_valid() {
730            Ok(&self.config.bucket)
731        } else {
732            Err(
733                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
734                    .with_context("service", Scheme::S3),
735            )
736        }?;
737        debug!("backend use bucket {}", &bucket);
738
739        let default_storage_class = match &self.config.default_storage_class {
740            None => None,
741            Some(v) => Some(
742                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
743            ),
744        };
745
746        let server_side_encryption = match &self.config.server_side_encryption {
747            None => None,
748            Some(v) => Some(
749                build_header_value(v)
750                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
751            ),
752        };
753
754        let server_side_encryption_aws_kms_key_id =
755            match &self.config.server_side_encryption_aws_kms_key_id {
756                None => None,
757                Some(v) => Some(build_header_value(v).map_err(|err| {
758                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
759                })?),
760            };
761
762        let server_side_encryption_customer_algorithm =
763            match &self.config.server_side_encryption_customer_algorithm {
764                None => None,
765                Some(v) => Some(build_header_value(v).map_err(|err| {
766                    err.with_context("key", "server_side_encryption_customer_algorithm")
767                })?),
768            };
769
770        let server_side_encryption_customer_key =
771            match &self.config.server_side_encryption_customer_key {
772                None => None,
773                Some(v) => Some(build_header_value(v).map_err(|err| {
774                    err.with_context("key", "server_side_encryption_customer_key")
775                })?),
776            };
777
778        let server_side_encryption_customer_key_md5 =
779            match &self.config.server_side_encryption_customer_key_md5 {
780                None => None,
781                Some(v) => Some(build_header_value(v).map_err(|err| {
782                    err.with_context("key", "server_side_encryption_customer_key_md5")
783                })?),
784            };
785
786        let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
787            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
788            None => None,
789            v => {
790                return Err(Error::new(
791                    ErrorKind::ConfigInvalid,
792                    format!("{:?} is not a supported checksum_algorithm.", v),
793                ))
794            }
795        };
796
797        // This is our current config.
798        let mut cfg = AwsConfig::default();
799        if !self.config.disable_config_load {
800            #[cfg(not(target_arch = "wasm32"))]
801            {
802                cfg = cfg.from_profile();
803                cfg = cfg.from_env();
804            }
805        }
806
807        if let Some(ref v) = self.config.region {
808            cfg.region = Some(v.to_string());
809        }
810
811        if cfg.region.is_none() {
812            return Err(Error::new(
813                ErrorKind::ConfigInvalid,
814                "region is missing. Please find it by S3::detect_region() or set them in env.",
815            )
816            .with_operation("Builder::build")
817            .with_context("service", Scheme::S3));
818        }
819
820        let region = cfg.region.to_owned().unwrap();
821        debug!("backend use region: {region}");
822
823        // Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
824        self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
825
826        // Building endpoint.
827        let endpoint = self.build_endpoint(&region);
828        debug!("backend use endpoint: {endpoint}");
829
830        // Setting all value from user input if available.
831        if let Some(v) = self.config.access_key_id {
832            cfg.access_key_id = Some(v)
833        }
834        if let Some(v) = self.config.secret_access_key {
835            cfg.secret_access_key = Some(v)
836        }
837        if let Some(v) = self.config.session_token {
838            cfg.session_token = Some(v)
839        }
840
841        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
842        // If customized_credential_load is set, we will use it.
843        if let Some(v) = self.customized_credential_load {
844            loader = Some(v);
845        }
846
847        // If role_arn is set, we must use AssumeRoleLoad.
848        if let Some(role_arn) = self.config.role_arn {
849            // use current env as source credential loader.
850            let default_loader =
851                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
852
853            // Build the config for assume role.
854            let mut assume_role_cfg = AwsConfig {
855                region: Some(region.clone()),
856                role_arn: Some(role_arn),
857                external_id: self.config.external_id.clone(),
858                sts_regional_endpoints: "regional".to_string(),
859                ..Default::default()
860            };
861
862            // override default role_session_name if set
863            if let Some(name) = self.config.role_session_name {
864                assume_role_cfg.role_session_name = name;
865            }
866
867            let assume_role_loader = AwsAssumeRoleLoader::new(
868                GLOBAL_REQWEST_CLIENT.clone().clone(),
869                assume_role_cfg,
870                Box::new(default_loader),
871            )
872            .map_err(|err| {
873                Error::new(
874                    ErrorKind::ConfigInvalid,
875                    "The assume_role_loader is misconfigured",
876                )
877                .with_context("service", Scheme::S3)
878                .set_source(err)
879            })?;
880            loader = Some(Box::new(assume_role_loader));
881        }
882        // If loader is not set, we will use default loader.
883        let loader = match loader {
884            Some(v) => v,
885            None => {
886                let mut default_loader =
887                    AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
888                if self.config.disable_ec2_metadata {
889                    default_loader = default_loader.with_disable_ec2_metadata();
890                }
891
892                Box::new(default_loader)
893            }
894        };
895
896        let signer = AwsV4Signer::new("s3", &region);
897
898        let delete_max_size = self
899            .config
900            .delete_max_size
901            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
902
903        Ok(S3Backend {
904            core: Arc::new(S3Core {
905                info: {
906                    let am = AccessorInfo::default();
907                    am.set_scheme(Scheme::S3)
908                        .set_root(&root)
909                        .set_name(bucket)
910                        .set_native_capability(Capability {
911                            stat: true,
912                            stat_has_content_encoding: true,
913                            stat_with_if_match: true,
914                            stat_with_if_none_match: true,
915                            stat_with_if_modified_since: true,
916                            stat_with_if_unmodified_since: true,
917                            stat_with_override_cache_control: !self
918                                .config
919                                .disable_stat_with_override,
920                            stat_with_override_content_disposition: !self
921                                .config
922                                .disable_stat_with_override,
923                            stat_with_override_content_type: !self
924                                .config
925                                .disable_stat_with_override,
926                            stat_with_version: self.config.enable_versioning,
927                            stat_has_cache_control: true,
928                            stat_has_content_length: true,
929                            stat_has_content_type: true,
930                            stat_has_content_range: true,
931                            stat_has_etag: true,
932                            stat_has_content_md5: true,
933                            stat_has_last_modified: true,
934                            stat_has_content_disposition: true,
935                            stat_has_user_metadata: true,
936                            stat_has_version: true,
937
938                            read: true,
939                            read_with_if_match: true,
940                            read_with_if_none_match: true,
941                            read_with_if_modified_since: true,
942                            read_with_if_unmodified_since: true,
943                            read_with_override_cache_control: true,
944                            read_with_override_content_disposition: true,
945                            read_with_override_content_type: true,
946                            read_with_version: self.config.enable_versioning,
947
948                            write: true,
949                            write_can_empty: true,
950                            write_can_multi: true,
951                            write_can_append: self.config.enable_write_with_append,
952
953                            write_with_cache_control: true,
954                            write_with_content_type: true,
955                            write_with_content_encoding: true,
956                            write_with_if_match: !self.config.disable_write_with_if_match,
957                            write_with_if_not_exists: true,
958                            write_with_user_metadata: true,
959
960                            // The min multipart size of S3 is 5 MiB.
961                            //
962                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
963                            write_multi_min_size: Some(5 * 1024 * 1024),
964                            // The max multipart size of S3 is 5 GiB.
965                            //
966                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
967                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
968                                Some(5 * 1024 * 1024 * 1024)
969                            } else {
970                                Some(usize::MAX)
971                            },
972
973                            delete: true,
974                            delete_max_size: Some(delete_max_size),
975                            delete_with_version: self.config.enable_versioning,
976
977                            copy: true,
978
979                            list: true,
980                            list_with_limit: true,
981                            list_with_start_after: true,
982                            list_with_recursive: true,
983                            list_with_versions: self.config.enable_versioning,
984                            list_with_deleted: self.config.enable_versioning,
985                            list_has_etag: true,
986                            list_has_content_md5: true,
987                            list_has_content_length: true,
988                            list_has_last_modified: true,
989
990                            presign: true,
991                            presign_stat: true,
992                            presign_read: true,
993                            presign_write: true,
994
995                            shared: true,
996
997                            ..Default::default()
998                        });
999
1000                    // allow deprecated api here for compatibility
1001                    #[allow(deprecated)]
1002                    if let Some(client) = self.http_client {
1003                        am.update_http_client(|_| client);
1004                    }
1005
1006                    am.into()
1007                },
1008                bucket: bucket.to_string(),
1009                endpoint,
1010                root,
1011                server_side_encryption,
1012                server_side_encryption_aws_kms_key_id,
1013                server_side_encryption_customer_algorithm,
1014                server_side_encryption_customer_key,
1015                server_side_encryption_customer_key_md5,
1016                default_storage_class,
1017                allow_anonymous: self.config.allow_anonymous,
1018                disable_list_objects_v2: self.config.disable_list_objects_v2,
1019                enable_request_payer: self.config.enable_request_payer,
1020                signer,
1021                loader,
1022                credential_loaded: AtomicBool::new(false),
1023                checksum_algorithm,
1024            }),
1025        })
1026    }
1027}
1028
1029/// Backend for s3 services.
1030#[derive(Debug, Clone)]
1031pub struct S3Backend {
1032    core: Arc<S3Core>,
1033}
1034
1035impl Access for S3Backend {
1036    type Reader = HttpBody;
1037    type Writer = S3Writers;
1038    type Lister = S3Listers;
1039    type Deleter = oio::BatchDeleter<S3Deleter>;
1040
1041    fn info(&self) -> Arc<AccessorInfo> {
1042        self.core.info.clone()
1043    }
1044
1045    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1046        let resp = self.core.s3_head_object(path, args).await?;
1047
1048        let status = resp.status();
1049
1050        match status {
1051            StatusCode::OK => {
1052                let headers = resp.headers();
1053                let mut meta = parse_into_metadata(path, headers)?;
1054
1055                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1056                if !user_meta.is_empty() {
1057                    meta = meta.with_user_metadata(user_meta);
1058                }
1059
1060                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1061                    meta.set_version(v);
1062                }
1063
1064                Ok(RpStat::new(meta))
1065            }
1066            _ => Err(parse_error(resp)),
1067        }
1068    }
1069
1070    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1071        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1072
1073        let status = resp.status();
1074        match status {
1075            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1076                Ok((RpRead::default(), resp.into_body()))
1077            }
1078            _ => {
1079                let (part, mut body) = resp.into_parts();
1080                let buf = body.to_buffer().await?;
1081                Err(parse_error(Response::from_parts(part, buf)))
1082            }
1083        }
1084    }
1085
1086    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1087        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1088
1089        let w = if args.append() {
1090            S3Writers::Two(oio::AppendWriter::new(writer))
1091        } else {
1092            S3Writers::One(oio::MultipartWriter::new(
1093                self.core.info.clone(),
1094                writer,
1095                args.concurrent(),
1096            ))
1097        };
1098
1099        Ok((RpWrite::default(), w))
1100    }
1101
1102    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1103        Ok((
1104            RpDelete::default(),
1105            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1106        ))
1107    }
1108
1109    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1110        let l = if args.versions() || args.deleted() {
1111            ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1112                self.core.clone(),
1113                path,
1114                args,
1115            )))
1116        } else if self.core.disable_list_objects_v2 {
1117            ThreeWays::One(PageLister::new(S3ListerV1::new(
1118                self.core.clone(),
1119                path,
1120                args,
1121            )))
1122        } else {
1123            ThreeWays::Two(PageLister::new(S3ListerV2::new(
1124                self.core.clone(),
1125                path,
1126                args,
1127            )))
1128        };
1129
1130        Ok((RpList::default(), l))
1131    }
1132
1133    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1134        let resp = self.core.s3_copy_object(from, to).await?;
1135
1136        let status = resp.status();
1137
1138        match status {
1139            StatusCode::OK => Ok(RpCopy::default()),
1140            _ => Err(parse_error(resp)),
1141        }
1142    }
1143
1144    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1145        let (expire, op) = args.into_parts();
1146        // We will not send this request out, just for signing.
1147        let req = match op {
1148            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1149            PresignOperation::Read(v) => {
1150                self.core
1151                    .s3_get_object_request(path, BytesRange::default(), &v)
1152            }
1153            PresignOperation::Write(_) => {
1154                self.core
1155                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1156            }
1157            PresignOperation::Delete(_) => Err(Error::new(
1158                ErrorKind::Unsupported,
1159                "operation is not supported",
1160            )),
1161        };
1162        let mut req = req?;
1163
1164        self.core.sign_query(&mut req, expire).await?;
1165
1166        // We don't need this request anymore, consume it directly.
1167        let (parts, _) = req.into_parts();
1168
1169        Ok(RpPresign::new(PresignedRequest::new(
1170            parts.method,
1171            parts.uri,
1172            parts.headers,
1173        )))
1174    }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::*;
1180
1181    #[test]
1182    fn test_is_valid_bucket() {
1183        let bucket_cases = vec![
1184            ("", false, false),
1185            ("test", false, true),
1186            ("test.xyz", false, true),
1187            ("", true, false),
1188            ("test", true, true),
1189            ("test.xyz", true, false),
1190        ];
1191
1192        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1193            let mut b = S3Builder::default();
1194            b = b.bucket(bucket);
1195            if enable_virtual_host_style {
1196                b = b.enable_virtual_host_style();
1197            }
1198            assert_eq!(b.is_bucket_valid(), expected)
1199        }
1200    }
1201
1202    #[test]
1203    fn test_build_endpoint() {
1204        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1205
1206        let endpoint_cases = vec![
1207            Some("s3.amazonaws.com"),
1208            Some("https://s3.amazonaws.com"),
1209            Some("https://s3.us-east-2.amazonaws.com"),
1210            None,
1211        ];
1212
1213        for endpoint in &endpoint_cases {
1214            let mut b = S3Builder::default().bucket("test");
1215            if let Some(endpoint) = endpoint {
1216                b = b.endpoint(endpoint);
1217            }
1218
1219            let endpoint = b.build_endpoint("us-east-2");
1220            assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1221        }
1222
1223        for endpoint in &endpoint_cases {
1224            let mut b = S3Builder::default()
1225                .bucket("test")
1226                .enable_virtual_host_style();
1227            if let Some(endpoint) = endpoint {
1228                b = b.endpoint(endpoint);
1229            }
1230
1231            let endpoint = b.build_endpoint("us-east-2");
1232            assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1233        }
1234    }
1235
1236    #[tokio::test]
1237    async fn test_detect_region() {
1238        let cases = vec![
1239            (
1240                "aws s3 without region in endpoint",
1241                "https://s3.amazonaws.com",
1242                "example",
1243                Some("us-east-1"),
1244            ),
1245            (
1246                "aws s3 with region in endpoint",
1247                "https://s3.us-east-1.amazonaws.com",
1248                "example",
1249                Some("us-east-1"),
1250            ),
1251            (
1252                "oss with public endpoint",
1253                "https://oss-ap-southeast-1.aliyuncs.com",
1254                "example",
1255                Some("oss-ap-southeast-1"),
1256            ),
1257            (
1258                "oss with internal endpoint",
1259                "https://oss-cn-hangzhou-internal.aliyuncs.com",
1260                "example",
1261                Some("oss-cn-hangzhou-internal"),
1262            ),
1263            (
1264                "r2",
1265                "https://abc.xxxxx.r2.cloudflarestorage.com",
1266                "example",
1267                Some("auto"),
1268            ),
1269            (
1270                "invalid service",
1271                "https://opendal.apache.org",
1272                "example",
1273                None,
1274            ),
1275        ];
1276
1277        for (name, endpoint, bucket, expected) in cases {
1278            let region = S3Builder::detect_region(endpoint, bucket).await;
1279            assert_eq!(region.as_deref(), expected, "{}", name);
1280        }
1281    }
1282}