1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Write;
21use std::str::FromStr;
22use std::sync::Arc;
23use std::sync::LazyLock;
24
25use base64::Engine;
26use base64::prelude::BASE64_STANDARD;
27use constants::X_AMZ_META_PREFIX;
28use constants::X_AMZ_VERSION_ID;
29use http::Response;
30use http::StatusCode;
31use log::debug;
32use log::warn;
33use md5::Digest;
34use md5::Md5;
35use reqsign_aws_v4::AssumeRoleCredentialProvider;
36use reqsign_aws_v4::Credential;
37use reqsign_aws_v4::DefaultCredentialProvider;
38use reqsign_aws_v4::RequestSigner as AwsV4Signer;
39use reqsign_aws_v4::StaticCredentialProvider;
40use reqsign_core::Context;
41use reqsign_core::OsEnv;
42use reqsign_core::ProvideCredentialChain;
43use reqsign_core::Signer;
44use reqsign_file_read_tokio::TokioFileRead;
45use reqsign_http_send_reqwest::ReqwestHttpSend;
46use reqwest::Url;
47
48use super::S3_SCHEME;
49use super::config::S3Config;
50use super::core::*;
51use super::deleter::S3Deleter;
52use super::error::parse_error;
53use super::lister::S3ListerV1;
54use super::lister::S3ListerV2;
55use super::lister::S3Listers;
56use super::lister::S3ObjectVersionsLister;
57use super::writer::S3Writer;
58use super::writer::S3Writers;
59use crate::raw::*;
60use crate::*;
61
62static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
64 let mut m = HashMap::new();
65 m.insert(
67 "https://s3.amazonaws.com",
68 "https://s3.{region}.amazonaws.com",
69 );
70 m
71});
72
73const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
74
75#[doc = include_str!("docs.md")]
78#[doc = include_str!("compatible_services.md")]
79#[derive(Default)]
80pub struct S3Builder {
81 pub(super) config: S3Config,
82
83 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
84 pub(super) http_client: Option<HttpClient>,
85 pub(super) credential_providers: Option<ProvideCredentialChain<Credential>>,
86}
87
88impl Debug for S3Builder {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("S3Builder")
91 .field("config", &self.config)
92 .finish_non_exhaustive()
93 }
94}
95
96impl S3Builder {
97 pub fn root(mut self, root: &str) -> Self {
101 self.config.root = if root.is_empty() {
102 None
103 } else {
104 Some(root.to_string())
105 };
106
107 self
108 }
109
110 pub fn bucket(mut self, bucket: &str) -> Self {
112 self.config.bucket = bucket.to_string();
113
114 self
115 }
116
117 pub fn endpoint(mut self, endpoint: &str) -> Self {
130 if !endpoint.is_empty() {
131 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
133 }
134
135 self
136 }
137
138 pub fn region(mut self, region: &str) -> Self {
145 if !region.is_empty() {
146 self.config.region = Some(region.to_string())
147 }
148
149 self
150 }
151
152 pub fn access_key_id(mut self, v: &str) -> Self {
157 if !v.is_empty() {
158 self.config.access_key_id = Some(v.to_string())
159 }
160
161 self
162 }
163
164 pub fn secret_access_key(mut self, v: &str) -> Self {
169 if !v.is_empty() {
170 self.config.secret_access_key = Some(v.to_string())
171 }
172
173 self
174 }
175
176 pub fn role_arn(mut self, v: &str) -> Self {
181 if !v.is_empty() {
182 self.config.role_arn = Some(v.to_string())
183 }
184
185 self
186 }
187
188 pub fn external_id(mut self, v: &str) -> Self {
190 if !v.is_empty() {
191 self.config.external_id = Some(v.to_string())
192 }
193
194 self
195 }
196
197 pub fn role_session_name(mut self, v: &str) -> Self {
199 if !v.is_empty() {
200 self.config.role_session_name = Some(v.to_string())
201 }
202
203 self
204 }
205
206 pub fn default_storage_class(mut self, v: &str) -> Self {
219 if !v.is_empty() {
220 self.config.default_storage_class = Some(v.to_string())
221 }
222
223 self
224 }
225
226 pub fn server_side_encryption(mut self, v: &str) -> Self {
237 if !v.is_empty() {
238 self.config.server_side_encryption = Some(v.to_string())
239 }
240
241 self
242 }
243
244 pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
261 if !v.is_empty() {
262 self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
263 }
264
265 self
266 }
267
268 pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
279 if !v.is_empty() {
280 self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
281 }
282
283 self
284 }
285
286 pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
300 if !v.is_empty() {
301 self.config.server_side_encryption_customer_key = Some(v.to_string())
302 }
303
304 self
305 }
306
307 pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
320 if !v.is_empty() {
321 self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
322 }
323
324 self
325 }
326
327 pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
333 self.config.server_side_encryption = Some("aws:kms".to_string());
334 self
335 }
336
337 pub fn server_side_encryption_with_customer_managed_kms_key(
343 mut self,
344 aws_kms_key_id: &str,
345 ) -> Self {
346 self.config.server_side_encryption = Some("aws:kms".to_string());
347 self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
348 self
349 }
350
351 pub fn server_side_encryption_with_s3_key(mut self) -> Self {
357 self.config.server_side_encryption = Some("AES256".to_string());
358 self
359 }
360
361 pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
367 self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
368 self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
369 let key_md5 = Md5::digest(key);
370 self.config.server_side_encryption_customer_key_md5 = Some(BASE64_STANDARD.encode(key_md5));
371 self
372 }
373
374 pub fn session_token(mut self, token: &str) -> Self {
380 if !token.is_empty() {
381 self.config.session_token = Some(token.to_string());
382 }
383 self
384 }
385
386 pub fn disable_config_load(mut self) -> Self {
394 self.config.disable_config_load = true;
395 self
396 }
397
398 pub fn disable_list_objects_v2(mut self) -> Self {
404 self.config.disable_list_objects_v2 = true;
405 self
406 }
407
408 pub fn enable_request_payer(mut self) -> Self {
412 self.config.enable_request_payer = true;
413 self
414 }
415
416 pub fn disable_ec2_metadata(mut self) -> Self {
421 self.config.disable_ec2_metadata = true;
422 self
423 }
424
425 pub fn allow_anonymous(mut self) -> Self {
428 self.config.allow_anonymous = true;
429 self
430 }
431
432 pub fn enable_virtual_host_style(mut self) -> Self {
438 self.config.enable_virtual_host_style = true;
439 self
440 }
441
442 pub fn disable_stat_with_override(mut self) -> Self {
446 self.config.disable_stat_with_override = true;
447 self
448 }
449
450 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
457 #[allow(deprecated)]
458 pub fn http_client(mut self, client: HttpClient) -> Self {
459 self.http_client = Some(client);
460 self
461 }
462
463 pub fn enable_versioning(mut self, enabled: bool) -> Self {
465 self.config.enable_versioning = enabled;
466
467 self
468 }
469
470 pub fn credential_provider_chain(mut self, chain: ProvideCredentialChain<Credential>) -> Self {
472 self.credential_providers = Some(chain);
473 self
474 }
475
476 fn is_bucket_valid(config: &S3Config) -> bool {
480 if config.bucket.is_empty() {
481 return false;
482 }
483 if config.enable_virtual_host_style && config.bucket.contains('.') {
487 return false;
488 }
489 true
490 }
491
492 fn build_endpoint(config: &S3Config, region: &str) -> String {
494 let bucket = {
495 debug_assert!(Self::is_bucket_valid(config), "bucket must be valid");
496
497 config.bucket.as_str()
498 };
499
500 let mut endpoint = match &config.endpoint {
501 Some(endpoint) => {
502 if endpoint.starts_with("http") {
503 endpoint.to_string()
504 } else {
505 format!("https://{endpoint}")
507 }
508 }
509 None => "https://s3.amazonaws.com".to_string(),
510 };
511
512 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
514
515 if let Ok(url) = Url::from_str(&endpoint) {
517 endpoint = url.to_string().trim_end_matches('/').to_string();
519 }
520
521 endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
523 template.replace("{region}", region)
524 } else {
525 endpoint.to_string()
528 };
529
530 if config.enable_virtual_host_style {
532 endpoint = endpoint.replace("//", &format!("//{bucket}."))
533 } else {
534 write!(endpoint, "/{bucket}").expect("write into string must succeed");
535 };
536
537 endpoint
538 }
539
540 #[deprecated(
542 since = "0.52.0",
543 note = "Please use `delete_max_size` instead of `batch_max_operations`"
544 )]
545 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
546 self.config.delete_max_size = Some(batch_max_operations);
547
548 self
549 }
550
551 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
553 self.config.delete_max_size = Some(delete_max_size);
554
555 self
556 }
557
558 pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
564 self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
565
566 self
567 }
568
569 pub fn disable_write_with_if_match(mut self) -> Self {
571 self.config.disable_write_with_if_match = true;
572 self
573 }
574
575 pub fn enable_write_with_append(mut self) -> Self {
577 self.config.enable_write_with_append = true;
578 self
579 }
580
581 pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
617 let endpoint = endpoint.trim_end_matches('/');
619
620 let mut endpoint = if endpoint.starts_with("http") {
622 endpoint.to_string()
623 } else {
624 format!("https://{endpoint}")
626 };
627
628 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
630 let url = format!("{endpoint}/{bucket}");
631
632 debug!("detect region with url: {url}");
633
634 if endpoint.ends_with("r2.cloudflarestorage.com") {
640 return Some("auto".to_string());
641 }
642
643 if let Some(v) = endpoint.strip_prefix("https://s3.") {
645 if let Some(region) = v.strip_suffix(".amazonaws.com") {
646 return Some(region.to_string());
647 }
648 }
649
650 if let Some(v) = endpoint.strip_prefix("https://") {
655 if let Some(region) = v.strip_suffix(".aliyuncs.com") {
656 return Some(region.to_string());
657 }
658
659 if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
660 return Some(region.to_string());
661 }
662 }
663
664 let req = http::Request::head(&url).body(Buffer::new()).ok()?;
666
667 let client = HttpClient::new().ok()?;
668 let res = client
669 .send(req)
670 .await
671 .map_err(|err| warn!("detect region failed for: {err:?}"))
672 .ok()?;
673
674 debug!(
675 "auto detect region got response: status {:?}, header: {:?}",
676 res.status(),
677 res.headers()
678 );
679
680 if let Some(header) = res.headers().get("x-amz-bucket-region") {
682 if let Ok(regin) = header.to_str() {
683 return Some(regin.to_string());
684 }
685 }
686
687 if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
690 return Some("us-east-1".to_string());
691 }
692
693 None
694 }
695}
696
697impl Builder for S3Builder {
698 type Config = S3Config;
699
700 fn build(self) -> Result<impl Access> {
701 debug!("backend build started: {:?}", &self);
702
703 #[allow(deprecated)]
704 let S3Builder {
705 mut config,
706 http_client,
707 credential_providers,
708 } = self;
709
710 let root = normalize_root(&config.root.clone().unwrap_or_default());
711 debug!("backend use root {}", &root);
712
713 let bucket = if Self::is_bucket_valid(&config) {
715 Ok(&config.bucket)
716 } else {
717 Err(
718 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
719 .with_context("service", S3_SCHEME),
720 )
721 }?;
722 debug!("backend use bucket {}", &bucket);
723
724 let default_storage_class = match &config.default_storage_class {
725 None => None,
726 Some(v) => Some(
727 build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
728 ),
729 };
730
731 let server_side_encryption = match &config.server_side_encryption {
732 None => None,
733 Some(v) => Some(
734 build_header_value(v)
735 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
736 ),
737 };
738
739 let server_side_encryption_aws_kms_key_id =
740 match &config.server_side_encryption_aws_kms_key_id {
741 None => None,
742 Some(v) => Some(build_header_value(v).map_err(|err| {
743 err.with_context("key", "server_side_encryption_aws_kms_key_id")
744 })?),
745 };
746
747 let server_side_encryption_customer_algorithm =
748 match &config.server_side_encryption_customer_algorithm {
749 None => None,
750 Some(v) => Some(build_header_value(v).map_err(|err| {
751 err.with_context("key", "server_side_encryption_customer_algorithm")
752 })?),
753 };
754
755 let server_side_encryption_customer_key =
756 match &config.server_side_encryption_customer_key {
757 None => None,
758 Some(v) => Some(build_header_value(v).map_err(|err| {
759 err.with_context("key", "server_side_encryption_customer_key")
760 })?),
761 };
762
763 let server_side_encryption_customer_key_md5 =
764 match &config.server_side_encryption_customer_key_md5 {
765 None => None,
766 Some(v) => Some(build_header_value(v).map_err(|err| {
767 err.with_context("key", "server_side_encryption_customer_key_md5")
768 })?),
769 };
770
771 let checksum_algorithm = match config.checksum_algorithm.as_deref() {
772 Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
773 Some("md5") => Some(ChecksumAlgorithm::Md5),
774 None => None,
775 v => {
776 return Err(Error::new(
777 ErrorKind::ConfigInvalid,
778 format!("{v:?} is not a supported checksum_algorithm."),
779 ));
780 }
781 };
782
783 let region = if let Some(ref v) = config.region {
785 v.to_string()
786 } else {
787 std::env::var("AWS_REGION")
788 .or_else(|_| std::env::var("AWS_DEFAULT_REGION"))
789 .map_err(|_| {
790 Error::new(
791 ErrorKind::ConfigInvalid,
792 "region is missing. Please find it by S3::detect_region() or set them in env.",
793 )
794 .with_operation("Builder::build")
795 .with_context("service", S3_SCHEME)
796 })?
797 };
798 debug!("backend use region: {region}");
799
800 if config.endpoint.is_none() && !config.disable_config_load {
801 let endpoint_from_env = std::env::var("AWS_ENDPOINT_URL")
802 .or_else(|_| std::env::var("AWS_ENDPOINT"))
803 .or_else(|_| std::env::var("AWS_S3_ENDPOINT"))
804 .ok();
805 if let Some(endpoint) = endpoint_from_env {
806 let normalized = endpoint.trim_end_matches('/').to_string();
807 config.endpoint = Some(normalized);
808 }
809 }
810
811 let endpoint = Self::build_endpoint(&config, ®ion);
813 debug!("backend use endpoint: {endpoint}");
814
815 let ctx = Context::new()
817 .with_file_read(TokioFileRead)
818 .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
819 .with_env(OsEnv);
820
821 let mut provider = {
822 let mut builder = DefaultCredentialProvider::builder();
823
824 if config.disable_config_load {
825 builder = builder.disable_env(true).disable_profile(true);
826 }
827
828 if config.disable_ec2_metadata {
829 builder = builder.disable_imds(true);
830 }
831
832 ProvideCredentialChain::new().push(builder.build())
833 };
834
835 if let (Some(ak), Some(sk)) = (&config.access_key_id, &config.secret_access_key) {
837 let static_provider = if let Some(token) = config.session_token.as_deref() {
838 StaticCredentialProvider::new(ak, sk).with_session_token(token)
839 } else {
840 StaticCredentialProvider::new(ak, sk)
841 };
842 provider = provider.push_front(static_provider);
843 }
844
845 if let Some(role_arn) = &config.role_arn {
847 let sts_ctx = ctx.clone();
848 let sts_request_signer = AwsV4Signer::new("sts", ®ion);
849 let sts_signer = Signer::new(sts_ctx, provider, sts_request_signer);
850 let mut assume_role_provider =
851 AssumeRoleCredentialProvider::new(role_arn.clone(), sts_signer)
852 .with_region(region.clone())
853 .with_regional_sts_endpoint();
854
855 if let Some(external_id) = &config.external_id {
856 assume_role_provider = assume_role_provider.with_external_id(external_id.clone());
857 }
858 if let Some(role_session_name) = &config.role_session_name {
859 assume_role_provider =
860 assume_role_provider.with_role_session_name(role_session_name.clone());
861 }
862 provider = ProvideCredentialChain::new().push(assume_role_provider);
863 }
864
865 let provider = if let Some(credential_providers) = credential_providers {
867 credential_providers
868 } else {
869 provider
870 };
871
872 let request_signer = AwsV4Signer::new("s3", ®ion);
874
875 let signer = Signer::new(ctx, provider, request_signer);
877
878 let delete_max_size = config
879 .delete_max_size
880 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
881
882 Ok(S3Backend {
883 core: Arc::new(S3Core {
884 info: {
885 let am = AccessorInfo::default();
886 am.set_scheme(S3_SCHEME)
887 .set_root(&root)
888 .set_name(bucket)
889 .set_native_capability(Capability {
890 stat: true,
891 stat_with_if_match: true,
892 stat_with_if_none_match: true,
893 stat_with_if_modified_since: true,
894 stat_with_if_unmodified_since: true,
895 stat_with_override_cache_control: !config.disable_stat_with_override,
896 stat_with_override_content_disposition: !config
897 .disable_stat_with_override,
898 stat_with_override_content_type: !config.disable_stat_with_override,
899 stat_with_version: config.enable_versioning,
900
901 read: true,
902 read_with_if_match: true,
903 read_with_if_none_match: true,
904 read_with_if_modified_since: true,
905 read_with_if_unmodified_since: true,
906 read_with_override_cache_control: true,
907 read_with_override_content_disposition: true,
908 read_with_override_content_type: true,
909 read_with_version: config.enable_versioning,
910
911 write: true,
912 write_can_empty: true,
913 write_can_multi: true,
914 write_can_append: config.enable_write_with_append,
915
916 write_with_cache_control: true,
917 write_with_content_type: true,
918 write_with_content_encoding: true,
919 write_with_if_match: !config.disable_write_with_if_match,
920 write_with_if_not_exists: true,
921 write_with_user_metadata: true,
922
923 write_multi_min_size: Some(5 * 1024 * 1024),
927 write_multi_max_size: if cfg!(target_pointer_width = "64") {
931 Some(5 * 1024 * 1024 * 1024)
932 } else {
933 Some(usize::MAX)
934 },
935
936 delete: true,
937 delete_max_size: Some(delete_max_size),
938 delete_with_version: config.enable_versioning,
939
940 copy: true,
941
942 list: true,
943 list_with_limit: true,
944 list_with_start_after: true,
945 list_with_recursive: true,
946 list_with_versions: config.enable_versioning,
947 list_with_deleted: config.enable_versioning,
948
949 presign: true,
950 presign_stat: true,
951 presign_read: true,
952 presign_write: true,
953
954 shared: true,
955
956 ..Default::default()
957 });
958
959 #[allow(deprecated)]
961 if let Some(client) = http_client {
962 am.update_http_client(|_| client);
963 }
964
965 am.into()
966 },
967 bucket: bucket.to_string(),
968 endpoint,
969 root,
970 server_side_encryption,
971 server_side_encryption_aws_kms_key_id,
972 server_side_encryption_customer_algorithm,
973 server_side_encryption_customer_key,
974 server_side_encryption_customer_key_md5,
975 default_storage_class,
976 allow_anonymous: config.allow_anonymous,
977 disable_list_objects_v2: config.disable_list_objects_v2,
978 enable_request_payer: config.enable_request_payer,
979 signer,
980 checksum_algorithm,
981 }),
982 })
983 }
984}
985
986#[derive(Debug, Clone)]
988pub struct S3Backend {
989 core: Arc<S3Core>,
990}
991
992impl Access for S3Backend {
993 type Reader = HttpBody;
994 type Writer = S3Writers;
995 type Lister = S3Listers;
996 type Deleter = oio::BatchDeleter<S3Deleter>;
997
998 fn info(&self) -> Arc<AccessorInfo> {
999 self.core.info.clone()
1000 }
1001
1002 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1003 let resp = self.core.s3_head_object(path, args).await?;
1004
1005 let status = resp.status();
1006
1007 match status {
1008 StatusCode::OK => {
1009 let headers = resp.headers();
1010 let mut meta = parse_into_metadata(path, headers)?;
1011
1012 let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1013 if !user_meta.is_empty() {
1014 meta = meta.with_user_metadata(user_meta);
1015 }
1016
1017 if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1018 meta.set_version(v);
1019 }
1020
1021 Ok(RpStat::new(meta))
1022 }
1023 _ => Err(parse_error(resp)),
1024 }
1025 }
1026
1027 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1028 let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1029
1030 let status = resp.status();
1031 match status {
1032 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1033 Ok((RpRead::default(), resp.into_body()))
1034 }
1035 _ => {
1036 let (part, mut body) = resp.into_parts();
1037 let buf = body.to_buffer().await?;
1038 Err(parse_error(Response::from_parts(part, buf)))
1039 }
1040 }
1041 }
1042
1043 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1044 let writer = S3Writer::new(self.core.clone(), path, args.clone());
1045
1046 let w = if args.append() {
1047 S3Writers::Two(oio::AppendWriter::new(writer))
1048 } else {
1049 S3Writers::One(oio::MultipartWriter::new(
1050 self.core.info.clone(),
1051 writer,
1052 args.concurrent(),
1053 ))
1054 };
1055
1056 Ok((RpWrite::default(), w))
1057 }
1058
1059 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1060 Ok((
1061 RpDelete::default(),
1062 oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1063 ))
1064 }
1065
1066 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1067 let l = if args.versions() || args.deleted() {
1068 ThreeWays::Three(oio::PageLister::new(S3ObjectVersionsLister::new(
1069 self.core.clone(),
1070 path,
1071 args,
1072 )))
1073 } else if self.core.disable_list_objects_v2 {
1074 ThreeWays::One(oio::PageLister::new(S3ListerV1::new(
1075 self.core.clone(),
1076 path,
1077 args,
1078 )))
1079 } else {
1080 ThreeWays::Two(oio::PageLister::new(S3ListerV2::new(
1081 self.core.clone(),
1082 path,
1083 args,
1084 )))
1085 };
1086
1087 Ok((RpList::default(), l))
1088 }
1089
1090 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1091 let resp = self.core.s3_copy_object(from, to).await?;
1092
1093 let status = resp.status();
1094
1095 match status {
1096 StatusCode::OK => Ok(RpCopy::default()),
1097 _ => Err(parse_error(resp)),
1098 }
1099 }
1100
1101 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1102 let (expire, op) = args.into_parts();
1103 let req = match op {
1105 PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1106 PresignOperation::Read(v) => {
1107 self.core
1108 .s3_get_object_request(path, BytesRange::default(), &v)
1109 }
1110 PresignOperation::Write(_) => {
1111 self.core
1112 .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1113 }
1114 PresignOperation::Delete(_) => Err(Error::new(
1115 ErrorKind::Unsupported,
1116 "operation is not supported",
1117 )),
1118 };
1119 let req = req?;
1120
1121 let req = self.core.sign_query(req, expire).await?;
1122
1123 let (parts, _) = req.into_parts();
1125
1126 Ok(RpPresign::new(PresignedRequest::new(
1127 parts.method,
1128 parts.uri,
1129 parts.headers,
1130 )))
1131 }
1132}
1133#[cfg(test)]
1134mod tests {
1135 use super::*;
1136
1137 #[test]
1138 fn test_is_valid_bucket() {
1139 let bucket_cases = vec![
1140 ("", false, false),
1141 ("test", false, true),
1142 ("test.xyz", false, true),
1143 ("", true, false),
1144 ("test", true, true),
1145 ("test.xyz", true, false),
1146 ];
1147
1148 for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1149 let mut b = S3Builder::default();
1150 b = b.bucket(bucket);
1151 if enable_virtual_host_style {
1152 b = b.enable_virtual_host_style();
1153 }
1154 assert_eq!(S3Builder::is_bucket_valid(&b.config), expected)
1155 }
1156 }
1157
1158 #[test]
1159 fn test_build_endpoint() {
1160 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1161
1162 let endpoint_cases = vec![
1163 Some("s3.amazonaws.com"),
1164 Some("https://s3.amazonaws.com"),
1165 Some("https://s3.us-east-2.amazonaws.com"),
1166 None,
1167 ];
1168
1169 for endpoint in &endpoint_cases {
1170 let mut b = S3Builder::default().bucket("test");
1171 if let Some(endpoint) = endpoint {
1172 b = b.endpoint(endpoint);
1173 }
1174
1175 let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
1176 assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1177 }
1178
1179 for endpoint in &endpoint_cases {
1180 let mut b = S3Builder::default()
1181 .bucket("test")
1182 .enable_virtual_host_style();
1183 if let Some(endpoint) = endpoint {
1184 b = b.endpoint(endpoint);
1185 }
1186
1187 let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2");
1188 assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1189 }
1190 }
1191
1192 #[tokio::test]
1193 async fn test_detect_region() {
1194 let cases = vec![
1195 (
1196 "aws s3 without region in endpoint",
1197 "https://s3.amazonaws.com",
1198 "example",
1199 Some("us-east-1"),
1200 ),
1201 (
1202 "aws s3 with region in endpoint",
1203 "https://s3.us-east-1.amazonaws.com",
1204 "example",
1205 Some("us-east-1"),
1206 ),
1207 (
1208 "oss with public endpoint",
1209 "https://oss-ap-southeast-1.aliyuncs.com",
1210 "example",
1211 Some("oss-ap-southeast-1"),
1212 ),
1213 (
1214 "oss with internal endpoint",
1215 "https://oss-cn-hangzhou-internal.aliyuncs.com",
1216 "example",
1217 Some("oss-cn-hangzhou-internal"),
1218 ),
1219 (
1220 "r2",
1221 "https://abc.xxxxx.r2.cloudflarestorage.com",
1222 "example",
1223 Some("auto"),
1224 ),
1225 (
1226 "invalid service",
1227 "https://opendal.apache.org",
1228 "example",
1229 None,
1230 ),
1231 ];
1232
1233 for (name, endpoint, bucket, expected) in cases {
1234 let region = S3Builder::detect_region(endpoint, bucket).await;
1235 assert_eq!(region.as_deref(), expected, "{name}");
1236 }
1237 }
1238}