opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25
26use base64::prelude::BASE64_STANDARD;
27use base64::Engine;
28use constants::X_AMZ_META_PREFIX;
29use http::Response;
30use http::StatusCode;
31use log::debug;
32use log::warn;
33use md5::Digest;
34use md5::Md5;
35use reqsign::AwsAssumeRoleLoader;
36use reqsign::AwsConfig;
37use reqsign::AwsCredentialLoad;
38use reqsign::AwsDefaultLoader;
39use reqsign::AwsV4Signer;
40use reqwest::Url;
41use std::sync::LazyLock;
42
43use super::core::*;
44use super::delete::S3Deleter;
45use super::error::parse_error;
46use super::lister::{S3ListerV1, S3ListerV2, S3Listers, S3ObjectVersionsLister};
47use super::writer::S3Writer;
48use super::writer::S3Writers;
49use crate::raw::oio::PageLister;
50use crate::raw::*;
51use crate::services::S3Config;
52use crate::*;
53use constants::X_AMZ_VERSION_ID;
54
55/// Allow constructing correct region endpoint if user gives a global endpoint.
56static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
57    let mut m = HashMap::new();
58    // AWS S3 Service.
59    m.insert(
60        "https://s3.amazonaws.com",
61        "https://s3.{region}.amazonaws.com",
62    );
63    m
64});
65
66const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
67
68impl Configurator for S3Config {
69    type Builder = S3Builder;
70
71    #[allow(deprecated)]
72    fn into_builder(self) -> Self::Builder {
73        S3Builder {
74            config: self,
75            customized_credential_load: None,
76
77            http_client: None,
78        }
79    }
80}
81
82/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
83/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
84#[doc = include_str!("docs.md")]
85#[doc = include_str!("compatible_services.md")]
86#[derive(Default)]
87pub struct S3Builder {
88    config: S3Config,
89
90    customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
91
92    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
93    http_client: Option<HttpClient>,
94}
95
96impl Debug for S3Builder {
97    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98        let mut d = f.debug_struct("S3Builder");
99
100        d.field("config", &self.config);
101        d.finish_non_exhaustive()
102    }
103}
104
105impl S3Builder {
106    /// Set root of this backend.
107    ///
108    /// All operations will happen under this root.
109    pub fn root(mut self, root: &str) -> Self {
110        self.config.root = if root.is_empty() {
111            None
112        } else {
113            Some(root.to_string())
114        };
115
116        self
117    }
118
119    /// Set bucket name of this backend.
120    pub fn bucket(mut self, bucket: &str) -> Self {
121        self.config.bucket = bucket.to_string();
122
123        self
124    }
125
126    /// Set endpoint of this backend.
127    ///
128    /// Endpoint must be full uri, e.g.
129    ///
130    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
131    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
132    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
133    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
134    /// - Minio: `http://127.0.0.1:9000`
135    ///
136    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
137    /// will prepend "https://" before it.
138    pub fn endpoint(mut self, endpoint: &str) -> Self {
139        if !endpoint.is_empty() {
140            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
141            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
142        }
143
144        self
145    }
146
147    /// Region represent the signing region of this endpoint. This is required
148    /// if you are using the default AWS S3 endpoint.
149    ///
150    /// If using a custom endpoint,
151    /// - If region is set, we will take user's input first.
152    /// - If not, we will try to load it from environment.
153    pub fn region(mut self, region: &str) -> Self {
154        if !region.is_empty() {
155            self.config.region = Some(region.to_string())
156        }
157
158        self
159    }
160
161    /// Set access_key_id of this backend.
162    ///
163    /// - If access_key_id is set, we will take user's input first.
164    /// - If not, we will try to load it from environment.
165    pub fn access_key_id(mut self, v: &str) -> Self {
166        if !v.is_empty() {
167            self.config.access_key_id = Some(v.to_string())
168        }
169
170        self
171    }
172
173    /// Set secret_access_key of this backend.
174    ///
175    /// - If secret_access_key is set, we will take user's input first.
176    /// - If not, we will try to load it from environment.
177    pub fn secret_access_key(mut self, v: &str) -> Self {
178        if !v.is_empty() {
179            self.config.secret_access_key = Some(v.to_string())
180        }
181
182        self
183    }
184
185    /// Set role_arn for this backend.
186    ///
187    /// If `role_arn` is set, we will use already known config as source
188    /// credential to assume role with `role_arn`.
189    pub fn role_arn(mut self, v: &str) -> Self {
190        if !v.is_empty() {
191            self.config.role_arn = Some(v.to_string())
192        }
193
194        self
195    }
196
197    /// Set external_id for this backend.
198    pub fn external_id(mut self, v: &str) -> Self {
199        if !v.is_empty() {
200            self.config.external_id = Some(v.to_string())
201        }
202
203        self
204    }
205
206    /// Set role_session_name for this backend.
207    pub fn role_session_name(mut self, v: &str) -> Self {
208        if !v.is_empty() {
209            self.config.role_session_name = Some(v.to_string())
210        }
211
212        self
213    }
214
215    /// Set default storage_class for this backend.
216    ///
217    /// Available values:
218    /// - `DEEP_ARCHIVE`
219    /// - `GLACIER`
220    /// - `GLACIER_IR`
221    /// - `INTELLIGENT_TIERING`
222    /// - `ONEZONE_IA`
223    /// - `OUTPOSTS`
224    /// - `REDUCED_REDUNDANCY`
225    /// - `STANDARD`
226    /// - `STANDARD_IA`
227    pub fn default_storage_class(mut self, v: &str) -> Self {
228        if !v.is_empty() {
229            self.config.default_storage_class = Some(v.to_string())
230        }
231
232        self
233    }
234
235    /// Set server_side_encryption for this backend.
236    ///
237    /// Available values: `AES256`, `aws:kms`.
238    ///
239    /// # Note
240    ///
241    /// This function is the low-level setting for SSE related features.
242    ///
243    /// SSE related options should be set carefully to make them works.
244    /// Please use `server_side_encryption_with_*` helpers if even possible.
245    pub fn server_side_encryption(mut self, v: &str) -> Self {
246        if !v.is_empty() {
247            self.config.server_side_encryption = Some(v.to_string())
248        }
249
250        self
251    }
252
253    /// Set server_side_encryption_aws_kms_key_id for this backend
254    ///
255    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
256    ///   is not set, S3 will use aws managed kms key to encrypt data.
257    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
258    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
259    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
260    ///   returned.
261    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
262    ///
263    /// # Note
264    ///
265    /// This function is the low-level setting for SSE related features.
266    ///
267    /// SSE related options should be set carefully to make them works.
268    /// Please use `server_side_encryption_with_*` helpers if even possible.
269    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
270        if !v.is_empty() {
271            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
272        }
273
274        self
275    }
276
277    /// Set server_side_encryption_customer_algorithm for this backend.
278    ///
279    /// Available values: `AES256`.
280    ///
281    /// # Note
282    ///
283    /// This function is the low-level setting for SSE related features.
284    ///
285    /// SSE related options should be set carefully to make them works.
286    /// Please use `server_side_encryption_with_*` helpers if even possible.
287    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
288        if !v.is_empty() {
289            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
290        }
291
292        self
293    }
294
295    /// Set server_side_encryption_customer_key for this backend.
296    ///
297    /// # Args
298    ///
299    /// `v`: base64 encoded key that matches algorithm specified in
300    /// `server_side_encryption_customer_algorithm`.
301    ///
302    /// # Note
303    ///
304    /// This function is the low-level setting for SSE related features.
305    ///
306    /// SSE related options should be set carefully to make them works.
307    /// Please use `server_side_encryption_with_*` helpers if even possible.
308    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
309        if !v.is_empty() {
310            self.config.server_side_encryption_customer_key = Some(v.to_string())
311        }
312
313        self
314    }
315
316    /// Set server_side_encryption_customer_key_md5 for this backend.
317    ///
318    /// # Args
319    ///
320    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
321    ///
322    /// # Note
323    ///
324    /// This function is the low-level setting for SSE related features.
325    ///
326    /// SSE related options should be set carefully to make them works.
327    /// Please use `server_side_encryption_with_*` helpers if even possible.
328    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
329        if !v.is_empty() {
330            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
331        }
332
333        self
334    }
335
336    /// Enable server side encryption with aws managed kms key
337    ///
338    /// As known as: SSE-KMS
339    ///
340    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
341    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
342        self.config.server_side_encryption = Some("aws:kms".to_string());
343        self
344    }
345
346    /// Enable server side encryption with customer managed kms key
347    ///
348    /// As known as: SSE-KMS
349    ///
350    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
351    pub fn server_side_encryption_with_customer_managed_kms_key(
352        mut self,
353        aws_kms_key_id: &str,
354    ) -> Self {
355        self.config.server_side_encryption = Some("aws:kms".to_string());
356        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
357        self
358    }
359
360    /// Enable server side encryption with s3 managed key
361    ///
362    /// As known as: SSE-S3
363    ///
364    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
365    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
366        self.config.server_side_encryption = Some("AES256".to_string());
367        self
368    }
369
370    /// Enable server side encryption with customer key.
371    ///
372    /// As known as: SSE-C
373    ///
374    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
375    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
376        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
377        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
378        self.config.server_side_encryption_customer_key_md5 =
379            Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
380        self
381    }
382
383    /// Set temporary credential used in AWS S3 connections
384    ///
385    /// # Warning
386    ///
387    /// session token's lifetime is short and requires users to refresh in time.
388    pub fn session_token(mut self, token: &str) -> Self {
389        if !token.is_empty() {
390            self.config.session_token = Some(token.to_string());
391        }
392        self
393    }
394
395    /// Set temporary credential used in AWS S3 connections
396    #[deprecated(note = "Please use `session_token` instead")]
397    pub fn security_token(self, token: &str) -> Self {
398        self.session_token(token)
399    }
400
401    /// Disable config load so that opendal will not load config from
402    /// environment.
403    ///
404    /// For examples:
405    ///
406    /// - envs like `AWS_ACCESS_KEY_ID`
407    /// - files like `~/.aws/config`
408    pub fn disable_config_load(mut self) -> Self {
409        self.config.disable_config_load = true;
410        self
411    }
412
413    /// Disable list objects v2 so that opendal will not use the older
414    /// List Objects V1 to list objects.
415    ///
416    /// By default, OpenDAL uses List Objects V2 to list objects. However,
417    /// some legacy services do not yet support V2.
418    pub fn disable_list_objects_v2(mut self) -> Self {
419        self.config.disable_list_objects_v2 = true;
420        self
421    }
422
423    /// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
424    ///
425    /// With this option the client accepts to pay for the request and data transfer costs.
426    pub fn enable_request_payer(mut self) -> Self {
427        self.config.enable_request_payer = true;
428        self
429    }
430
431    /// Disable load credential from ec2 metadata.
432    ///
433    /// This option is used to disable the default behavior of opendal
434    /// to load credential from ec2 metadata, a.k.a, IMDSv2
435    pub fn disable_ec2_metadata(mut self) -> Self {
436        self.config.disable_ec2_metadata = true;
437        self
438    }
439
440    /// Allow anonymous will allow opendal to send request without signing
441    /// when credential is not loaded.
442    pub fn allow_anonymous(mut self) -> Self {
443        self.config.allow_anonymous = true;
444        self
445    }
446
447    /// Enable virtual host style so that opendal will send API requests
448    /// in virtual host style instead of path style.
449    ///
450    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
451    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
452    pub fn enable_virtual_host_style(mut self) -> Self {
453        self.config.enable_virtual_host_style = true;
454        self
455    }
456
457    /// Disable stat with override so that opendal will not send stat request with override queries.
458    ///
459    /// For example, R2 doesn't support stat with `response_content_type` query.
460    pub fn disable_stat_with_override(mut self) -> Self {
461        self.config.disable_stat_with_override = true;
462        self
463    }
464
465    /// Adding a customized credential load for service.
466    ///
467    /// If customized_credential_load has been set, we will ignore all other
468    /// credential load methods.
469    pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
470        self.customized_credential_load = Some(cred);
471        self
472    }
473
474    /// Specify the http client that used by this service.
475    ///
476    /// # Notes
477    ///
478    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
479    /// during minor updates.
480    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
481    #[allow(deprecated)]
482    pub fn http_client(mut self, client: HttpClient) -> Self {
483        self.http_client = Some(client);
484        self
485    }
486
487    /// Set bucket versioning status for this backend
488    pub fn enable_versioning(mut self, enabled: bool) -> Self {
489        self.config.enable_versioning = enabled;
490
491        self
492    }
493
494    /// Check if `bucket` is valid
495    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
496    /// it couldn't contain dot(.) character
497    fn is_bucket_valid(&self) -> bool {
498        if self.config.bucket.is_empty() {
499            return false;
500        }
501        // If enable virtual host style, `bucket` will reside in domain part,
502        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
503        // so `bucket` with dot can't be recognized correctly for this format.
504        if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
505            return false;
506        }
507        true
508    }
509
510    /// Build endpoint with given region.
511    fn build_endpoint(&self, region: &str) -> String {
512        let bucket = {
513            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
514
515            self.config.bucket.as_str()
516        };
517
518        let mut endpoint = match &self.config.endpoint {
519            Some(endpoint) => {
520                if endpoint.starts_with("http") {
521                    endpoint.to_string()
522                } else {
523                    // Prefix https if endpoint doesn't start with scheme.
524                    format!("https://{endpoint}")
525                }
526            }
527            None => "https://s3.amazonaws.com".to_string(),
528        };
529
530        // If endpoint contains bucket name, we should trim them.
531        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
532
533        // Omit default ports if specified.
534        if let Ok(url) = Url::from_str(&endpoint) {
535            // Remove the trailing `/` of root path.
536            endpoint = url.to_string().trim_end_matches('/').to_string();
537        }
538
539        // Update with endpoint templates.
540        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
541            template.replace("{region}", region)
542        } else {
543            // If we don't know where about this endpoint, just leave
544            // them as it.
545            endpoint.to_string()
546        };
547
548        // Apply virtual host style.
549        if self.config.enable_virtual_host_style {
550            endpoint = endpoint.replace("//", &format!("//{bucket}."))
551        } else {
552            write!(endpoint, "/{bucket}").expect("write into string must succeed");
553        };
554
555        endpoint
556    }
557
558    /// Set maximum batch operations of this backend.
559    #[deprecated(
560        since = "0.52.0",
561        note = "Please use `delete_max_size` instead of `batch_max_operations`"
562    )]
563    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
564        self.config.delete_max_size = Some(batch_max_operations);
565
566        self
567    }
568
569    /// Set maximum delete operations of this backend.
570    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
571        self.config.delete_max_size = Some(delete_max_size);
572
573        self
574    }
575
576    /// Set checksum algorithm of this backend.
577    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
578    ///
579    /// Available options:
580    /// - "crc32c"
581    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
582        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
583
584        self
585    }
586
587    /// Disable write with if match so that opendal will not send write request with if match headers.
588    pub fn disable_write_with_if_match(mut self) -> Self {
589        self.config.disable_write_with_if_match = true;
590        self
591    }
592
593    /// Enable write with append so that opendal will send write request with append headers.
594    pub fn enable_write_with_append(mut self) -> Self {
595        self.config.enable_write_with_append = true;
596        self
597    }
598
599    /// Detect region of S3 bucket.
600    ///
601    /// # Args
602    ///
603    /// - endpoint: the endpoint of S3 service
604    /// - bucket: the bucket of S3 service
605    ///
606    /// # Return
607    ///
608    /// - `Some(region)` means we detect the region successfully
609    /// - `None` means we can't detect the region or meeting errors.
610    ///
611    /// # Notes
612    ///
613    /// We will try to detect region by the following methods.
614    ///
615    /// - Match endpoint with given rules to get region
616    ///   - Cloudflare R2
617    ///   - AWS S3
618    ///   - Aliyun OSS
619    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
620    ///
621    /// # Examples
622    ///
623    /// ```no_run
624    /// use opendal::services::S3;
625    ///
626    /// # async fn example() {
627    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
628    /// # }
629    /// ```
630    ///
631    /// # Reference
632    ///
633    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
634    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
635        // Remove the possible trailing `/` in endpoint.
636        let endpoint = endpoint.trim_end_matches('/');
637
638        // Make sure the endpoint contains the scheme.
639        let mut endpoint = if endpoint.starts_with("http") {
640            endpoint.to_string()
641        } else {
642            // Prefix https if endpoint doesn't start with scheme.
643            format!("https://{}", endpoint)
644        };
645
646        // Remove bucket name from endpoint.
647        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
648        let url = format!("{endpoint}/{bucket}");
649
650        debug!("detect region with url: {url}");
651
652        // Try to detect region by endpoint.
653
654        // If this bucket is R2, we can return auto directly.
655        //
656        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
657        if endpoint.ends_with("r2.cloudflarestorage.com") {
658            return Some("auto".to_string());
659        }
660
661        // If this bucket is AWS, we can try to match the endpoint.
662        if let Some(v) = endpoint.strip_prefix("https://s3.") {
663            if let Some(region) = v.strip_suffix(".amazonaws.com") {
664                return Some(region.to_string());
665            }
666        }
667
668        // If this bucket is OSS, we can try to match the endpoint.
669        //
670        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
671        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
672        if let Some(v) = endpoint.strip_prefix("https://") {
673            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
674                return Some(region.to_string());
675            }
676
677            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
678                return Some(region.to_string());
679            }
680        }
681
682        // Try to detect region by HeadBucket.
683        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
684
685        let client = HttpClient::new().ok()?;
686        let res = client
687            .send(req)
688            .await
689            .map_err(|err| warn!("detect region failed for: {err:?}"))
690            .ok()?;
691
692        debug!(
693            "auto detect region got response: status {:?}, header: {:?}",
694            res.status(),
695            res.headers()
696        );
697
698        // Get region from response header no matter status code.
699        if let Some(header) = res.headers().get("x-amz-bucket-region") {
700            if let Ok(regin) = header.to_str() {
701                return Some(regin.to_string());
702            }
703        }
704
705        // Status code is 403 or 200 means we already visit the correct
706        // region, we can use the default region directly.
707        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
708            return Some("us-east-1".to_string());
709        }
710
711        None
712    }
713}
714
715impl Builder for S3Builder {
716    const SCHEME: Scheme = Scheme::S3;
717    type Config = S3Config;
718
719    fn build(mut self) -> Result<impl Access> {
720        debug!("backend build started: {:?}", &self);
721
722        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
723        debug!("backend use root {}", &root);
724
725        // Handle bucket name.
726        let bucket = if self.is_bucket_valid() {
727            Ok(&self.config.bucket)
728        } else {
729            Err(
730                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
731                    .with_context("service", Scheme::S3),
732            )
733        }?;
734        debug!("backend use bucket {}", &bucket);
735
736        let default_storage_class = match &self.config.default_storage_class {
737            None => None,
738            Some(v) => Some(
739                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
740            ),
741        };
742
743        let server_side_encryption = match &self.config.server_side_encryption {
744            None => None,
745            Some(v) => Some(
746                build_header_value(v)
747                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
748            ),
749        };
750
751        let server_side_encryption_aws_kms_key_id =
752            match &self.config.server_side_encryption_aws_kms_key_id {
753                None => None,
754                Some(v) => Some(build_header_value(v).map_err(|err| {
755                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
756                })?),
757            };
758
759        let server_side_encryption_customer_algorithm =
760            match &self.config.server_side_encryption_customer_algorithm {
761                None => None,
762                Some(v) => Some(build_header_value(v).map_err(|err| {
763                    err.with_context("key", "server_side_encryption_customer_algorithm")
764                })?),
765            };
766
767        let server_side_encryption_customer_key =
768            match &self.config.server_side_encryption_customer_key {
769                None => None,
770                Some(v) => Some(build_header_value(v).map_err(|err| {
771                    err.with_context("key", "server_side_encryption_customer_key")
772                })?),
773            };
774
775        let server_side_encryption_customer_key_md5 =
776            match &self.config.server_side_encryption_customer_key_md5 {
777                None => None,
778                Some(v) => Some(build_header_value(v).map_err(|err| {
779                    err.with_context("key", "server_side_encryption_customer_key_md5")
780                })?),
781            };
782
783        let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
784            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
785            None => None,
786            v => {
787                return Err(Error::new(
788                    ErrorKind::ConfigInvalid,
789                    format!("{:?} is not a supported checksum_algorithm.", v),
790                ))
791            }
792        };
793
794        // This is our current config.
795        let mut cfg = AwsConfig::default();
796        if !self.config.disable_config_load {
797            #[cfg(not(target_arch = "wasm32"))]
798            {
799                cfg = cfg.from_profile();
800                cfg = cfg.from_env();
801            }
802        }
803
804        if let Some(ref v) = self.config.region {
805            cfg.region = Some(v.to_string());
806        }
807
808        if cfg.region.is_none() {
809            return Err(Error::new(
810                ErrorKind::ConfigInvalid,
811                "region is missing. Please find it by S3::detect_region() or set them in env.",
812            )
813            .with_operation("Builder::build")
814            .with_context("service", Scheme::S3));
815        }
816
817        let region = cfg.region.to_owned().unwrap();
818        debug!("backend use region: {region}");
819
820        // Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
821        self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
822
823        // Building endpoint.
824        let endpoint = self.build_endpoint(&region);
825        debug!("backend use endpoint: {endpoint}");
826
827        // Setting all value from user input if available.
828        if let Some(v) = self.config.access_key_id {
829            cfg.access_key_id = Some(v)
830        }
831        if let Some(v) = self.config.secret_access_key {
832            cfg.secret_access_key = Some(v)
833        }
834        if let Some(v) = self.config.session_token {
835            cfg.session_token = Some(v)
836        }
837
838        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
839        // If customized_credential_load is set, we will use it.
840        if let Some(v) = self.customized_credential_load {
841            loader = Some(v);
842        }
843
844        // If role_arn is set, we must use AssumeRoleLoad.
845        if let Some(role_arn) = self.config.role_arn {
846            // use current env as source credential loader.
847            let default_loader =
848                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
849
850            // Build the config for assume role.
851            let mut assume_role_cfg = AwsConfig {
852                region: Some(region.clone()),
853                role_arn: Some(role_arn),
854                external_id: self.config.external_id.clone(),
855                sts_regional_endpoints: "regional".to_string(),
856                ..Default::default()
857            };
858
859            // override default role_session_name if set
860            if let Some(name) = self.config.role_session_name {
861                assume_role_cfg.role_session_name = name;
862            }
863
864            let assume_role_loader = AwsAssumeRoleLoader::new(
865                GLOBAL_REQWEST_CLIENT.clone().clone(),
866                assume_role_cfg,
867                Box::new(default_loader),
868            )
869            .map_err(|err| {
870                Error::new(
871                    ErrorKind::ConfigInvalid,
872                    "The assume_role_loader is misconfigured",
873                )
874                .with_context("service", Scheme::S3)
875                .set_source(err)
876            })?;
877            loader = Some(Box::new(assume_role_loader));
878        }
879        // If loader is not set, we will use default loader.
880        let loader = match loader {
881            Some(v) => v,
882            None => {
883                let mut default_loader =
884                    AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
885                if self.config.disable_ec2_metadata {
886                    default_loader = default_loader.with_disable_ec2_metadata();
887                }
888
889                Box::new(default_loader)
890            }
891        };
892
893        let signer = AwsV4Signer::new("s3", &region);
894
895        let delete_max_size = self
896            .config
897            .delete_max_size
898            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
899
900        Ok(S3Backend {
901            core: Arc::new(S3Core {
902                info: {
903                    let am = AccessorInfo::default();
904                    am.set_scheme(Scheme::S3)
905                        .set_root(&root)
906                        .set_name(bucket)
907                        .set_native_capability(Capability {
908                            stat: true,
909                            stat_has_content_encoding: true,
910                            stat_with_if_match: true,
911                            stat_with_if_none_match: true,
912                            stat_with_if_modified_since: true,
913                            stat_with_if_unmodified_since: true,
914                            stat_with_override_cache_control: !self
915                                .config
916                                .disable_stat_with_override,
917                            stat_with_override_content_disposition: !self
918                                .config
919                                .disable_stat_with_override,
920                            stat_with_override_content_type: !self
921                                .config
922                                .disable_stat_with_override,
923                            stat_with_version: self.config.enable_versioning,
924                            stat_has_cache_control: true,
925                            stat_has_content_length: true,
926                            stat_has_content_type: true,
927                            stat_has_content_range: true,
928                            stat_has_etag: true,
929                            stat_has_content_md5: true,
930                            stat_has_last_modified: true,
931                            stat_has_content_disposition: true,
932                            stat_has_user_metadata: true,
933                            stat_has_version: true,
934
935                            read: true,
936                            read_with_if_match: true,
937                            read_with_if_none_match: true,
938                            read_with_if_modified_since: true,
939                            read_with_if_unmodified_since: true,
940                            read_with_override_cache_control: true,
941                            read_with_override_content_disposition: true,
942                            read_with_override_content_type: true,
943                            read_with_version: self.config.enable_versioning,
944
945                            write: true,
946                            write_can_empty: true,
947                            write_can_multi: true,
948                            write_can_append: self.config.enable_write_with_append,
949
950                            write_with_cache_control: true,
951                            write_with_content_type: true,
952                            write_with_content_encoding: true,
953                            write_with_if_match: !self.config.disable_write_with_if_match,
954                            write_with_if_not_exists: true,
955                            write_with_user_metadata: true,
956
957                            // The min multipart size of S3 is 5 MiB.
958                            //
959                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
960                            write_multi_min_size: Some(5 * 1024 * 1024),
961                            // The max multipart size of S3 is 5 GiB.
962                            //
963                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
964                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
965                                Some(5 * 1024 * 1024 * 1024)
966                            } else {
967                                Some(usize::MAX)
968                            },
969
970                            delete: true,
971                            delete_max_size: Some(delete_max_size),
972                            delete_with_version: self.config.enable_versioning,
973
974                            copy: true,
975
976                            list: true,
977                            list_with_limit: true,
978                            list_with_start_after: true,
979                            list_with_recursive: true,
980                            list_with_versions: self.config.enable_versioning,
981                            list_with_deleted: self.config.enable_versioning,
982                            list_has_etag: true,
983                            list_has_content_md5: true,
984                            list_has_content_length: true,
985                            list_has_last_modified: true,
986
987                            presign: true,
988                            presign_stat: true,
989                            presign_read: true,
990                            presign_write: true,
991
992                            shared: true,
993
994                            ..Default::default()
995                        });
996
997                    // allow deprecated api here for compatibility
998                    #[allow(deprecated)]
999                    if let Some(client) = self.http_client {
1000                        am.update_http_client(|_| client);
1001                    }
1002
1003                    am.into()
1004                },
1005                bucket: bucket.to_string(),
1006                endpoint,
1007                root,
1008                server_side_encryption,
1009                server_side_encryption_aws_kms_key_id,
1010                server_side_encryption_customer_algorithm,
1011                server_side_encryption_customer_key,
1012                server_side_encryption_customer_key_md5,
1013                default_storage_class,
1014                allow_anonymous: self.config.allow_anonymous,
1015                disable_list_objects_v2: self.config.disable_list_objects_v2,
1016                enable_request_payer: self.config.enable_request_payer,
1017                signer,
1018                loader,
1019                credential_loaded: AtomicBool::new(false),
1020                checksum_algorithm,
1021            }),
1022        })
1023    }
1024}
1025
1026/// Backend for s3 services.
1027#[derive(Debug, Clone)]
1028pub struct S3Backend {
1029    core: Arc<S3Core>,
1030}
1031
1032impl Access for S3Backend {
1033    type Reader = HttpBody;
1034    type Writer = S3Writers;
1035    type Lister = S3Listers;
1036    type Deleter = oio::BatchDeleter<S3Deleter>;
1037    type BlockingReader = ();
1038    type BlockingWriter = ();
1039    type BlockingLister = ();
1040    type BlockingDeleter = ();
1041
1042    fn info(&self) -> Arc<AccessorInfo> {
1043        self.core.info.clone()
1044    }
1045
1046    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1047        let resp = self.core.s3_head_object(path, args).await?;
1048
1049        let status = resp.status();
1050
1051        match status {
1052            StatusCode::OK => {
1053                let headers = resp.headers();
1054                let mut meta = parse_into_metadata(path, headers)?;
1055
1056                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1057                if !user_meta.is_empty() {
1058                    meta.with_user_metadata(user_meta);
1059                }
1060
1061                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1062                    meta.set_version(v);
1063                }
1064
1065                Ok(RpStat::new(meta))
1066            }
1067            _ => Err(parse_error(resp)),
1068        }
1069    }
1070
1071    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1072        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1073
1074        let status = resp.status();
1075        match status {
1076            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1077                Ok((RpRead::default(), resp.into_body()))
1078            }
1079            _ => {
1080                let (part, mut body) = resp.into_parts();
1081                let buf = body.to_buffer().await?;
1082                Err(parse_error(Response::from_parts(part, buf)))
1083            }
1084        }
1085    }
1086
1087    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1088        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1089
1090        let w = if args.append() {
1091            S3Writers::Two(oio::AppendWriter::new(writer))
1092        } else {
1093            S3Writers::One(oio::MultipartWriter::new(
1094                self.core.info.clone(),
1095                writer,
1096                args.concurrent(),
1097            ))
1098        };
1099
1100        Ok((RpWrite::default(), w))
1101    }
1102
1103    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1104        Ok((
1105            RpDelete::default(),
1106            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1107        ))
1108    }
1109
1110    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1111        let l = if args.versions() || args.deleted() {
1112            ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1113                self.core.clone(),
1114                path,
1115                args,
1116            )))
1117        } else if self.core.disable_list_objects_v2 {
1118            ThreeWays::One(PageLister::new(S3ListerV1::new(
1119                self.core.clone(),
1120                path,
1121                args,
1122            )))
1123        } else {
1124            ThreeWays::Two(PageLister::new(S3ListerV2::new(
1125                self.core.clone(),
1126                path,
1127                args,
1128            )))
1129        };
1130
1131        Ok((RpList::default(), l))
1132    }
1133
1134    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1135        let resp = self.core.s3_copy_object(from, to).await?;
1136
1137        let status = resp.status();
1138
1139        match status {
1140            StatusCode::OK => Ok(RpCopy::default()),
1141            _ => Err(parse_error(resp)),
1142        }
1143    }
1144
1145    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1146        let (expire, op) = args.into_parts();
1147        // We will not send this request out, just for signing.
1148        let req = match op {
1149            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1150            PresignOperation::Read(v) => {
1151                self.core
1152                    .s3_get_object_request(path, BytesRange::default(), &v)
1153            }
1154            PresignOperation::Write(_) => {
1155                self.core
1156                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1157            }
1158            PresignOperation::Delete(_) => Err(Error::new(
1159                ErrorKind::Unsupported,
1160                "operation is not supported",
1161            )),
1162        };
1163        let mut req = req?;
1164
1165        self.core.sign_query(&mut req, expire).await?;
1166
1167        // We don't need this request anymore, consume it directly.
1168        let (parts, _) = req.into_parts();
1169
1170        Ok(RpPresign::new(PresignedRequest::new(
1171            parts.method,
1172            parts.uri,
1173            parts.headers,
1174        )))
1175    }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use super::*;
1181
1182    #[test]
1183    fn test_is_valid_bucket() {
1184        let bucket_cases = vec![
1185            ("", false, false),
1186            ("test", false, true),
1187            ("test.xyz", false, true),
1188            ("", true, false),
1189            ("test", true, true),
1190            ("test.xyz", true, false),
1191        ];
1192
1193        for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1194            let mut b = S3Builder::default();
1195            b = b.bucket(bucket);
1196            if enable_virtual_host_style {
1197                b = b.enable_virtual_host_style();
1198            }
1199            assert_eq!(b.is_bucket_valid(), expected)
1200        }
1201    }
1202
1203    #[test]
1204    fn test_build_endpoint() {
1205        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1206
1207        let endpoint_cases = vec![
1208            Some("s3.amazonaws.com"),
1209            Some("https://s3.amazonaws.com"),
1210            Some("https://s3.us-east-2.amazonaws.com"),
1211            None,
1212        ];
1213
1214        for endpoint in &endpoint_cases {
1215            let mut b = S3Builder::default().bucket("test");
1216            if let Some(endpoint) = endpoint {
1217                b = b.endpoint(endpoint);
1218            }
1219
1220            let endpoint = b.build_endpoint("us-east-2");
1221            assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1222        }
1223
1224        for endpoint in &endpoint_cases {
1225            let mut b = S3Builder::default()
1226                .bucket("test")
1227                .enable_virtual_host_style();
1228            if let Some(endpoint) = endpoint {
1229                b = b.endpoint(endpoint);
1230            }
1231
1232            let endpoint = b.build_endpoint("us-east-2");
1233            assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1234        }
1235    }
1236
1237    #[tokio::test]
1238    async fn test_detect_region() {
1239        let cases = vec![
1240            (
1241                "aws s3 without region in endpoint",
1242                "https://s3.amazonaws.com",
1243                "example",
1244                Some("us-east-1"),
1245            ),
1246            (
1247                "aws s3 with region in endpoint",
1248                "https://s3.us-east-1.amazonaws.com",
1249                "example",
1250                Some("us-east-1"),
1251            ),
1252            (
1253                "oss with public endpoint",
1254                "https://oss-ap-southeast-1.aliyuncs.com",
1255                "example",
1256                Some("oss-ap-southeast-1"),
1257            ),
1258            (
1259                "oss with internal endpoint",
1260                "https://oss-cn-hangzhou-internal.aliyuncs.com",
1261                "example",
1262                Some("oss-cn-hangzhou-internal"),
1263            ),
1264            (
1265                "r2",
1266                "https://abc.xxxxx.r2.cloudflarestorage.com",
1267                "example",
1268                Some("auto"),
1269            ),
1270            (
1271                "invalid service",
1272                "https://opendal.apache.org",
1273                "example",
1274                None,
1275            ),
1276        ];
1277
1278        for (name, endpoint, bucket, expected) in cases {
1279            let region = S3Builder::detect_region(endpoint, bucket).await;
1280            assert_eq!(region.as_deref(), expected, "{}", name);
1281        }
1282    }
1283}