1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25
26use base64::prelude::BASE64_STANDARD;
27use base64::Engine;
28use constants::X_AMZ_META_PREFIX;
29use http::Response;
30use http::StatusCode;
31use log::debug;
32use log::warn;
33use md5::Digest;
34use md5::Md5;
35use reqsign::AwsAssumeRoleLoader;
36use reqsign::AwsConfig;
37use reqsign::AwsCredentialLoad;
38use reqsign::AwsDefaultLoader;
39use reqsign::AwsV4Signer;
40use reqwest::Url;
41use std::sync::LazyLock;
42
43use super::core::*;
44use super::delete::S3Deleter;
45use super::error::parse_error;
46use super::lister::{S3ListerV1, S3ListerV2, S3Listers, S3ObjectVersionsLister};
47use super::writer::S3Writer;
48use super::writer::S3Writers;
49use crate::raw::oio::PageLister;
50use crate::raw::*;
51use crate::services::S3Config;
52use crate::*;
53use constants::X_AMZ_VERSION_ID;
54
55static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
57 let mut m = HashMap::new();
58 m.insert(
60 "https://s3.amazonaws.com",
61 "https://s3.{region}.amazonaws.com",
62 );
63 m
64});
65
66const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
67
68impl Configurator for S3Config {
69 type Builder = S3Builder;
70
71 #[allow(deprecated)]
72 fn into_builder(self) -> Self::Builder {
73 S3Builder {
74 config: self,
75 customized_credential_load: None,
76
77 http_client: None,
78 }
79 }
80}
81
82#[doc = include_str!("docs.md")]
85#[doc = include_str!("compatible_services.md")]
86#[derive(Default)]
87pub struct S3Builder {
88 config: S3Config,
89
90 customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
91
92 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
93 http_client: Option<HttpClient>,
94}
95
96impl Debug for S3Builder {
97 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98 let mut d = f.debug_struct("S3Builder");
99
100 d.field("config", &self.config);
101 d.finish_non_exhaustive()
102 }
103}
104
105impl S3Builder {
106 pub fn root(mut self, root: &str) -> Self {
110 self.config.root = if root.is_empty() {
111 None
112 } else {
113 Some(root.to_string())
114 };
115
116 self
117 }
118
119 pub fn bucket(mut self, bucket: &str) -> Self {
121 self.config.bucket = bucket.to_string();
122
123 self
124 }
125
126 pub fn endpoint(mut self, endpoint: &str) -> Self {
139 if !endpoint.is_empty() {
140 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
142 }
143
144 self
145 }
146
147 pub fn region(mut self, region: &str) -> Self {
154 if !region.is_empty() {
155 self.config.region = Some(region.to_string())
156 }
157
158 self
159 }
160
161 pub fn access_key_id(mut self, v: &str) -> Self {
166 if !v.is_empty() {
167 self.config.access_key_id = Some(v.to_string())
168 }
169
170 self
171 }
172
173 pub fn secret_access_key(mut self, v: &str) -> Self {
178 if !v.is_empty() {
179 self.config.secret_access_key = Some(v.to_string())
180 }
181
182 self
183 }
184
185 pub fn role_arn(mut self, v: &str) -> Self {
190 if !v.is_empty() {
191 self.config.role_arn = Some(v.to_string())
192 }
193
194 self
195 }
196
197 pub fn external_id(mut self, v: &str) -> Self {
199 if !v.is_empty() {
200 self.config.external_id = Some(v.to_string())
201 }
202
203 self
204 }
205
206 pub fn role_session_name(mut self, v: &str) -> Self {
208 if !v.is_empty() {
209 self.config.role_session_name = Some(v.to_string())
210 }
211
212 self
213 }
214
215 pub fn default_storage_class(mut self, v: &str) -> Self {
228 if !v.is_empty() {
229 self.config.default_storage_class = Some(v.to_string())
230 }
231
232 self
233 }
234
235 pub fn server_side_encryption(mut self, v: &str) -> Self {
246 if !v.is_empty() {
247 self.config.server_side_encryption = Some(v.to_string())
248 }
249
250 self
251 }
252
253 pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
270 if !v.is_empty() {
271 self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
272 }
273
274 self
275 }
276
277 pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
288 if !v.is_empty() {
289 self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
290 }
291
292 self
293 }
294
295 pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
309 if !v.is_empty() {
310 self.config.server_side_encryption_customer_key = Some(v.to_string())
311 }
312
313 self
314 }
315
316 pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
329 if !v.is_empty() {
330 self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
331 }
332
333 self
334 }
335
336 pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
342 self.config.server_side_encryption = Some("aws:kms".to_string());
343 self
344 }
345
346 pub fn server_side_encryption_with_customer_managed_kms_key(
352 mut self,
353 aws_kms_key_id: &str,
354 ) -> Self {
355 self.config.server_side_encryption = Some("aws:kms".to_string());
356 self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
357 self
358 }
359
360 pub fn server_side_encryption_with_s3_key(mut self) -> Self {
366 self.config.server_side_encryption = Some("AES256".to_string());
367 self
368 }
369
370 pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
376 self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
377 self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
378 self.config.server_side_encryption_customer_key_md5 =
379 Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
380 self
381 }
382
383 pub fn session_token(mut self, token: &str) -> Self {
389 if !token.is_empty() {
390 self.config.session_token = Some(token.to_string());
391 }
392 self
393 }
394
395 #[deprecated(note = "Please use `session_token` instead")]
397 pub fn security_token(self, token: &str) -> Self {
398 self.session_token(token)
399 }
400
401 pub fn disable_config_load(mut self) -> Self {
409 self.config.disable_config_load = true;
410 self
411 }
412
413 pub fn disable_list_objects_v2(mut self) -> Self {
419 self.config.disable_list_objects_v2 = true;
420 self
421 }
422
423 pub fn enable_request_payer(mut self) -> Self {
427 self.config.enable_request_payer = true;
428 self
429 }
430
431 pub fn disable_ec2_metadata(mut self) -> Self {
436 self.config.disable_ec2_metadata = true;
437 self
438 }
439
440 pub fn allow_anonymous(mut self) -> Self {
443 self.config.allow_anonymous = true;
444 self
445 }
446
447 pub fn enable_virtual_host_style(mut self) -> Self {
453 self.config.enable_virtual_host_style = true;
454 self
455 }
456
457 pub fn disable_stat_with_override(mut self) -> Self {
461 self.config.disable_stat_with_override = true;
462 self
463 }
464
465 pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
470 self.customized_credential_load = Some(cred);
471 self
472 }
473
474 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
481 #[allow(deprecated)]
482 pub fn http_client(mut self, client: HttpClient) -> Self {
483 self.http_client = Some(client);
484 self
485 }
486
487 pub fn enable_versioning(mut self, enabled: bool) -> Self {
489 self.config.enable_versioning = enabled;
490
491 self
492 }
493
494 fn is_bucket_valid(&self) -> bool {
498 if self.config.bucket.is_empty() {
499 return false;
500 }
501 if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
505 return false;
506 }
507 true
508 }
509
510 fn build_endpoint(&self, region: &str) -> String {
512 let bucket = {
513 debug_assert!(self.is_bucket_valid(), "bucket must be valid");
514
515 self.config.bucket.as_str()
516 };
517
518 let mut endpoint = match &self.config.endpoint {
519 Some(endpoint) => {
520 if endpoint.starts_with("http") {
521 endpoint.to_string()
522 } else {
523 format!("https://{endpoint}")
525 }
526 }
527 None => "https://s3.amazonaws.com".to_string(),
528 };
529
530 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
532
533 if let Ok(url) = Url::from_str(&endpoint) {
535 endpoint = url.to_string().trim_end_matches('/').to_string();
537 }
538
539 endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
541 template.replace("{region}", region)
542 } else {
543 endpoint.to_string()
546 };
547
548 if self.config.enable_virtual_host_style {
550 endpoint = endpoint.replace("//", &format!("//{bucket}."))
551 } else {
552 write!(endpoint, "/{bucket}").expect("write into string must succeed");
553 };
554
555 endpoint
556 }
557
558 #[deprecated(
560 since = "0.52.0",
561 note = "Please use `delete_max_size` instead of `batch_max_operations`"
562 )]
563 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
564 self.config.delete_max_size = Some(batch_max_operations);
565
566 self
567 }
568
569 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
571 self.config.delete_max_size = Some(delete_max_size);
572
573 self
574 }
575
576 pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
582 self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
583
584 self
585 }
586
587 pub fn disable_write_with_if_match(mut self) -> Self {
589 self.config.disable_write_with_if_match = true;
590 self
591 }
592
593 pub fn enable_write_with_append(mut self) -> Self {
595 self.config.enable_write_with_append = true;
596 self
597 }
598
599 pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
635 let endpoint = endpoint.trim_end_matches('/');
637
638 let mut endpoint = if endpoint.starts_with("http") {
640 endpoint.to_string()
641 } else {
642 format!("https://{}", endpoint)
644 };
645
646 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
648 let url = format!("{endpoint}/{bucket}");
649
650 debug!("detect region with url: {url}");
651
652 if endpoint.ends_with("r2.cloudflarestorage.com") {
658 return Some("auto".to_string());
659 }
660
661 if let Some(v) = endpoint.strip_prefix("https://s3.") {
663 if let Some(region) = v.strip_suffix(".amazonaws.com") {
664 return Some(region.to_string());
665 }
666 }
667
668 if let Some(v) = endpoint.strip_prefix("https://") {
673 if let Some(region) = v.strip_suffix(".aliyuncs.com") {
674 return Some(region.to_string());
675 }
676
677 if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
678 return Some(region.to_string());
679 }
680 }
681
682 let req = http::Request::head(&url).body(Buffer::new()).ok()?;
684
685 let client = HttpClient::new().ok()?;
686 let res = client
687 .send(req)
688 .await
689 .map_err(|err| warn!("detect region failed for: {err:?}"))
690 .ok()?;
691
692 debug!(
693 "auto detect region got response: status {:?}, header: {:?}",
694 res.status(),
695 res.headers()
696 );
697
698 if let Some(header) = res.headers().get("x-amz-bucket-region") {
700 if let Ok(regin) = header.to_str() {
701 return Some(regin.to_string());
702 }
703 }
704
705 if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
708 return Some("us-east-1".to_string());
709 }
710
711 None
712 }
713}
714
715impl Builder for S3Builder {
716 const SCHEME: Scheme = Scheme::S3;
717 type Config = S3Config;
718
719 fn build(mut self) -> Result<impl Access> {
720 debug!("backend build started: {:?}", &self);
721
722 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
723 debug!("backend use root {}", &root);
724
725 let bucket = if self.is_bucket_valid() {
727 Ok(&self.config.bucket)
728 } else {
729 Err(
730 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
731 .with_context("service", Scheme::S3),
732 )
733 }?;
734 debug!("backend use bucket {}", &bucket);
735
736 let default_storage_class = match &self.config.default_storage_class {
737 None => None,
738 Some(v) => Some(
739 build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
740 ),
741 };
742
743 let server_side_encryption = match &self.config.server_side_encryption {
744 None => None,
745 Some(v) => Some(
746 build_header_value(v)
747 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
748 ),
749 };
750
751 let server_side_encryption_aws_kms_key_id =
752 match &self.config.server_side_encryption_aws_kms_key_id {
753 None => None,
754 Some(v) => Some(build_header_value(v).map_err(|err| {
755 err.with_context("key", "server_side_encryption_aws_kms_key_id")
756 })?),
757 };
758
759 let server_side_encryption_customer_algorithm =
760 match &self.config.server_side_encryption_customer_algorithm {
761 None => None,
762 Some(v) => Some(build_header_value(v).map_err(|err| {
763 err.with_context("key", "server_side_encryption_customer_algorithm")
764 })?),
765 };
766
767 let server_side_encryption_customer_key =
768 match &self.config.server_side_encryption_customer_key {
769 None => None,
770 Some(v) => Some(build_header_value(v).map_err(|err| {
771 err.with_context("key", "server_side_encryption_customer_key")
772 })?),
773 };
774
775 let server_side_encryption_customer_key_md5 =
776 match &self.config.server_side_encryption_customer_key_md5 {
777 None => None,
778 Some(v) => Some(build_header_value(v).map_err(|err| {
779 err.with_context("key", "server_side_encryption_customer_key_md5")
780 })?),
781 };
782
783 let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
784 Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
785 None => None,
786 v => {
787 return Err(Error::new(
788 ErrorKind::ConfigInvalid,
789 format!("{:?} is not a supported checksum_algorithm.", v),
790 ))
791 }
792 };
793
794 let mut cfg = AwsConfig::default();
796 if !self.config.disable_config_load {
797 #[cfg(not(target_arch = "wasm32"))]
798 {
799 cfg = cfg.from_profile();
800 cfg = cfg.from_env();
801 }
802 }
803
804 if let Some(ref v) = self.config.region {
805 cfg.region = Some(v.to_string());
806 }
807
808 if cfg.region.is_none() {
809 return Err(Error::new(
810 ErrorKind::ConfigInvalid,
811 "region is missing. Please find it by S3::detect_region() or set them in env.",
812 )
813 .with_operation("Builder::build")
814 .with_context("service", Scheme::S3));
815 }
816
817 let region = cfg.region.to_owned().unwrap();
818 debug!("backend use region: {region}");
819
820 self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
822
823 let endpoint = self.build_endpoint(®ion);
825 debug!("backend use endpoint: {endpoint}");
826
827 if let Some(v) = self.config.access_key_id {
829 cfg.access_key_id = Some(v)
830 }
831 if let Some(v) = self.config.secret_access_key {
832 cfg.secret_access_key = Some(v)
833 }
834 if let Some(v) = self.config.session_token {
835 cfg.session_token = Some(v)
836 }
837
838 let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
839 if let Some(v) = self.customized_credential_load {
841 loader = Some(v);
842 }
843
844 if let Some(role_arn) = self.config.role_arn {
846 let default_loader =
848 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
849
850 let mut assume_role_cfg = AwsConfig {
852 region: Some(region.clone()),
853 role_arn: Some(role_arn),
854 external_id: self.config.external_id.clone(),
855 sts_regional_endpoints: "regional".to_string(),
856 ..Default::default()
857 };
858
859 if let Some(name) = self.config.role_session_name {
861 assume_role_cfg.role_session_name = name;
862 }
863
864 let assume_role_loader = AwsAssumeRoleLoader::new(
865 GLOBAL_REQWEST_CLIENT.clone().clone(),
866 assume_role_cfg,
867 Box::new(default_loader),
868 )
869 .map_err(|err| {
870 Error::new(
871 ErrorKind::ConfigInvalid,
872 "The assume_role_loader is misconfigured",
873 )
874 .with_context("service", Scheme::S3)
875 .set_source(err)
876 })?;
877 loader = Some(Box::new(assume_role_loader));
878 }
879 let loader = match loader {
881 Some(v) => v,
882 None => {
883 let mut default_loader =
884 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
885 if self.config.disable_ec2_metadata {
886 default_loader = default_loader.with_disable_ec2_metadata();
887 }
888
889 Box::new(default_loader)
890 }
891 };
892
893 let signer = AwsV4Signer::new("s3", ®ion);
894
895 let delete_max_size = self
896 .config
897 .delete_max_size
898 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
899
900 Ok(S3Backend {
901 core: Arc::new(S3Core {
902 info: {
903 let am = AccessorInfo::default();
904 am.set_scheme(Scheme::S3)
905 .set_root(&root)
906 .set_name(bucket)
907 .set_native_capability(Capability {
908 stat: true,
909 stat_has_content_encoding: true,
910 stat_with_if_match: true,
911 stat_with_if_none_match: true,
912 stat_with_if_modified_since: true,
913 stat_with_if_unmodified_since: true,
914 stat_with_override_cache_control: !self
915 .config
916 .disable_stat_with_override,
917 stat_with_override_content_disposition: !self
918 .config
919 .disable_stat_with_override,
920 stat_with_override_content_type: !self
921 .config
922 .disable_stat_with_override,
923 stat_with_version: self.config.enable_versioning,
924 stat_has_cache_control: true,
925 stat_has_content_length: true,
926 stat_has_content_type: true,
927 stat_has_content_range: true,
928 stat_has_etag: true,
929 stat_has_content_md5: true,
930 stat_has_last_modified: true,
931 stat_has_content_disposition: true,
932 stat_has_user_metadata: true,
933 stat_has_version: true,
934
935 read: true,
936 read_with_if_match: true,
937 read_with_if_none_match: true,
938 read_with_if_modified_since: true,
939 read_with_if_unmodified_since: true,
940 read_with_override_cache_control: true,
941 read_with_override_content_disposition: true,
942 read_with_override_content_type: true,
943 read_with_version: self.config.enable_versioning,
944
945 write: true,
946 write_can_empty: true,
947 write_can_multi: true,
948 write_can_append: self.config.enable_write_with_append,
949
950 write_with_cache_control: true,
951 write_with_content_type: true,
952 write_with_content_encoding: true,
953 write_with_if_match: !self.config.disable_write_with_if_match,
954 write_with_if_not_exists: true,
955 write_with_user_metadata: true,
956
957 write_multi_min_size: Some(5 * 1024 * 1024),
961 write_multi_max_size: if cfg!(target_pointer_width = "64") {
965 Some(5 * 1024 * 1024 * 1024)
966 } else {
967 Some(usize::MAX)
968 },
969
970 delete: true,
971 delete_max_size: Some(delete_max_size),
972 delete_with_version: self.config.enable_versioning,
973
974 copy: true,
975
976 list: true,
977 list_with_limit: true,
978 list_with_start_after: true,
979 list_with_recursive: true,
980 list_with_versions: self.config.enable_versioning,
981 list_with_deleted: self.config.enable_versioning,
982 list_has_etag: true,
983 list_has_content_md5: true,
984 list_has_content_length: true,
985 list_has_last_modified: true,
986
987 presign: true,
988 presign_stat: true,
989 presign_read: true,
990 presign_write: true,
991
992 shared: true,
993
994 ..Default::default()
995 });
996
997 #[allow(deprecated)]
999 if let Some(client) = self.http_client {
1000 am.update_http_client(|_| client);
1001 }
1002
1003 am.into()
1004 },
1005 bucket: bucket.to_string(),
1006 endpoint,
1007 root,
1008 server_side_encryption,
1009 server_side_encryption_aws_kms_key_id,
1010 server_side_encryption_customer_algorithm,
1011 server_side_encryption_customer_key,
1012 server_side_encryption_customer_key_md5,
1013 default_storage_class,
1014 allow_anonymous: self.config.allow_anonymous,
1015 disable_list_objects_v2: self.config.disable_list_objects_v2,
1016 enable_request_payer: self.config.enable_request_payer,
1017 signer,
1018 loader,
1019 credential_loaded: AtomicBool::new(false),
1020 checksum_algorithm,
1021 }),
1022 })
1023 }
1024}
1025
1026#[derive(Debug, Clone)]
1028pub struct S3Backend {
1029 core: Arc<S3Core>,
1030}
1031
1032impl Access for S3Backend {
1033 type Reader = HttpBody;
1034 type Writer = S3Writers;
1035 type Lister = S3Listers;
1036 type Deleter = oio::BatchDeleter<S3Deleter>;
1037 type BlockingReader = ();
1038 type BlockingWriter = ();
1039 type BlockingLister = ();
1040 type BlockingDeleter = ();
1041
1042 fn info(&self) -> Arc<AccessorInfo> {
1043 self.core.info.clone()
1044 }
1045
1046 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1047 let resp = self.core.s3_head_object(path, args).await?;
1048
1049 let status = resp.status();
1050
1051 match status {
1052 StatusCode::OK => {
1053 let headers = resp.headers();
1054 let mut meta = parse_into_metadata(path, headers)?;
1055
1056 let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1057 if !user_meta.is_empty() {
1058 meta.with_user_metadata(user_meta);
1059 }
1060
1061 if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1062 meta.set_version(v);
1063 }
1064
1065 Ok(RpStat::new(meta))
1066 }
1067 _ => Err(parse_error(resp)),
1068 }
1069 }
1070
1071 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1072 let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1073
1074 let status = resp.status();
1075 match status {
1076 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1077 Ok((RpRead::default(), resp.into_body()))
1078 }
1079 _ => {
1080 let (part, mut body) = resp.into_parts();
1081 let buf = body.to_buffer().await?;
1082 Err(parse_error(Response::from_parts(part, buf)))
1083 }
1084 }
1085 }
1086
1087 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1088 let writer = S3Writer::new(self.core.clone(), path, args.clone());
1089
1090 let w = if args.append() {
1091 S3Writers::Two(oio::AppendWriter::new(writer))
1092 } else {
1093 S3Writers::One(oio::MultipartWriter::new(
1094 self.core.info.clone(),
1095 writer,
1096 args.concurrent(),
1097 ))
1098 };
1099
1100 Ok((RpWrite::default(), w))
1101 }
1102
1103 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1104 Ok((
1105 RpDelete::default(),
1106 oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1107 ))
1108 }
1109
1110 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1111 let l = if args.versions() || args.deleted() {
1112 ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1113 self.core.clone(),
1114 path,
1115 args,
1116 )))
1117 } else if self.core.disable_list_objects_v2 {
1118 ThreeWays::One(PageLister::new(S3ListerV1::new(
1119 self.core.clone(),
1120 path,
1121 args,
1122 )))
1123 } else {
1124 ThreeWays::Two(PageLister::new(S3ListerV2::new(
1125 self.core.clone(),
1126 path,
1127 args,
1128 )))
1129 };
1130
1131 Ok((RpList::default(), l))
1132 }
1133
1134 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1135 let resp = self.core.s3_copy_object(from, to).await?;
1136
1137 let status = resp.status();
1138
1139 match status {
1140 StatusCode::OK => Ok(RpCopy::default()),
1141 _ => Err(parse_error(resp)),
1142 }
1143 }
1144
1145 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1146 let (expire, op) = args.into_parts();
1147 let req = match op {
1149 PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1150 PresignOperation::Read(v) => {
1151 self.core
1152 .s3_get_object_request(path, BytesRange::default(), &v)
1153 }
1154 PresignOperation::Write(_) => {
1155 self.core
1156 .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1157 }
1158 PresignOperation::Delete(_) => Err(Error::new(
1159 ErrorKind::Unsupported,
1160 "operation is not supported",
1161 )),
1162 };
1163 let mut req = req?;
1164
1165 self.core.sign_query(&mut req, expire).await?;
1166
1167 let (parts, _) = req.into_parts();
1169
1170 Ok(RpPresign::new(PresignedRequest::new(
1171 parts.method,
1172 parts.uri,
1173 parts.headers,
1174 )))
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use super::*;
1181
1182 #[test]
1183 fn test_is_valid_bucket() {
1184 let bucket_cases = vec![
1185 ("", false, false),
1186 ("test", false, true),
1187 ("test.xyz", false, true),
1188 ("", true, false),
1189 ("test", true, true),
1190 ("test.xyz", true, false),
1191 ];
1192
1193 for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1194 let mut b = S3Builder::default();
1195 b = b.bucket(bucket);
1196 if enable_virtual_host_style {
1197 b = b.enable_virtual_host_style();
1198 }
1199 assert_eq!(b.is_bucket_valid(), expected)
1200 }
1201 }
1202
1203 #[test]
1204 fn test_build_endpoint() {
1205 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1206
1207 let endpoint_cases = vec![
1208 Some("s3.amazonaws.com"),
1209 Some("https://s3.amazonaws.com"),
1210 Some("https://s3.us-east-2.amazonaws.com"),
1211 None,
1212 ];
1213
1214 for endpoint in &endpoint_cases {
1215 let mut b = S3Builder::default().bucket("test");
1216 if let Some(endpoint) = endpoint {
1217 b = b.endpoint(endpoint);
1218 }
1219
1220 let endpoint = b.build_endpoint("us-east-2");
1221 assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1222 }
1223
1224 for endpoint in &endpoint_cases {
1225 let mut b = S3Builder::default()
1226 .bucket("test")
1227 .enable_virtual_host_style();
1228 if let Some(endpoint) = endpoint {
1229 b = b.endpoint(endpoint);
1230 }
1231
1232 let endpoint = b.build_endpoint("us-east-2");
1233 assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1234 }
1235 }
1236
1237 #[tokio::test]
1238 async fn test_detect_region() {
1239 let cases = vec![
1240 (
1241 "aws s3 without region in endpoint",
1242 "https://s3.amazonaws.com",
1243 "example",
1244 Some("us-east-1"),
1245 ),
1246 (
1247 "aws s3 with region in endpoint",
1248 "https://s3.us-east-1.amazonaws.com",
1249 "example",
1250 Some("us-east-1"),
1251 ),
1252 (
1253 "oss with public endpoint",
1254 "https://oss-ap-southeast-1.aliyuncs.com",
1255 "example",
1256 Some("oss-ap-southeast-1"),
1257 ),
1258 (
1259 "oss with internal endpoint",
1260 "https://oss-cn-hangzhou-internal.aliyuncs.com",
1261 "example",
1262 Some("oss-cn-hangzhou-internal"),
1263 ),
1264 (
1265 "r2",
1266 "https://abc.xxxxx.r2.cloudflarestorage.com",
1267 "example",
1268 Some("auto"),
1269 ),
1270 (
1271 "invalid service",
1272 "https://opendal.apache.org",
1273 "example",
1274 None,
1275 ),
1276 ];
1277
1278 for (name, endpoint, bucket, expected) in cases {
1279 let region = S3Builder::detect_region(endpoint, bucket).await;
1280 assert_eq!(region.as_deref(), expected, "{}", name);
1281 }
1282 }
1283}