1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25
26use base64::prelude::BASE64_STANDARD;
27use base64::Engine;
28use constants::X_AMZ_META_PREFIX;
29use http::Response;
30use http::StatusCode;
31use log::debug;
32use log::warn;
33use md5::Digest;
34use md5::Md5;
35use reqsign::AwsAssumeRoleLoader;
36use reqsign::AwsConfig;
37use reqsign::AwsCredentialLoad;
38use reqsign::AwsDefaultLoader;
39use reqsign::AwsV4Signer;
40use reqwest::Url;
41use std::sync::LazyLock;
42
43use super::core::*;
44use super::delete::S3Deleter;
45use super::error::parse_error;
46use super::lister::{S3Lister, S3Listers, S3ObjectVersionsLister};
47use super::writer::S3Writer;
48use super::writer::S3Writers;
49use crate::raw::oio::PageLister;
50use crate::raw::*;
51use crate::services::S3Config;
52use crate::*;
53use constants::X_AMZ_VERSION_ID;
54
55static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
57 let mut m = HashMap::new();
58 m.insert(
60 "https://s3.amazonaws.com",
61 "https://s3.{region}.amazonaws.com",
62 );
63 m
64});
65
66const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
67
68impl Configurator for S3Config {
69 type Builder = S3Builder;
70
71 #[allow(deprecated)]
72 fn into_builder(self) -> Self::Builder {
73 S3Builder {
74 config: self,
75 customized_credential_load: None,
76
77 http_client: None,
78 }
79 }
80}
81
82#[doc = include_str!("docs.md")]
85#[doc = include_str!("compatible_services.md")]
86#[derive(Default)]
87pub struct S3Builder {
88 config: S3Config,
89
90 customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
91
92 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
93 http_client: Option<HttpClient>,
94}
95
96impl Debug for S3Builder {
97 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98 let mut d = f.debug_struct("S3Builder");
99
100 d.field("config", &self.config);
101 d.finish_non_exhaustive()
102 }
103}
104
105impl S3Builder {
106 pub fn root(mut self, root: &str) -> Self {
110 self.config.root = if root.is_empty() {
111 None
112 } else {
113 Some(root.to_string())
114 };
115
116 self
117 }
118
119 pub fn bucket(mut self, bucket: &str) -> Self {
121 self.config.bucket = bucket.to_string();
122
123 self
124 }
125
126 pub fn endpoint(mut self, endpoint: &str) -> Self {
139 if !endpoint.is_empty() {
140 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
142 }
143
144 self
145 }
146
147 pub fn region(mut self, region: &str) -> Self {
154 if !region.is_empty() {
155 self.config.region = Some(region.to_string())
156 }
157
158 self
159 }
160
161 pub fn access_key_id(mut self, v: &str) -> Self {
166 if !v.is_empty() {
167 self.config.access_key_id = Some(v.to_string())
168 }
169
170 self
171 }
172
173 pub fn secret_access_key(mut self, v: &str) -> Self {
178 if !v.is_empty() {
179 self.config.secret_access_key = Some(v.to_string())
180 }
181
182 self
183 }
184
185 pub fn role_arn(mut self, v: &str) -> Self {
190 if !v.is_empty() {
191 self.config.role_arn = Some(v.to_string())
192 }
193
194 self
195 }
196
197 pub fn external_id(mut self, v: &str) -> Self {
199 if !v.is_empty() {
200 self.config.external_id = Some(v.to_string())
201 }
202
203 self
204 }
205
206 pub fn role_session_name(mut self, v: &str) -> Self {
208 if !v.is_empty() {
209 self.config.role_session_name = Some(v.to_string())
210 }
211
212 self
213 }
214
215 pub fn default_storage_class(mut self, v: &str) -> Self {
228 if !v.is_empty() {
229 self.config.default_storage_class = Some(v.to_string())
230 }
231
232 self
233 }
234
235 pub fn server_side_encryption(mut self, v: &str) -> Self {
246 if !v.is_empty() {
247 self.config.server_side_encryption = Some(v.to_string())
248 }
249
250 self
251 }
252
253 pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
270 if !v.is_empty() {
271 self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
272 }
273
274 self
275 }
276
277 pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
288 if !v.is_empty() {
289 self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
290 }
291
292 self
293 }
294
295 pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
309 if !v.is_empty() {
310 self.config.server_side_encryption_customer_key = Some(v.to_string())
311 }
312
313 self
314 }
315
316 pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
329 if !v.is_empty() {
330 self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
331 }
332
333 self
334 }
335
336 pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
342 self.config.server_side_encryption = Some("aws:kms".to_string());
343 self
344 }
345
346 pub fn server_side_encryption_with_customer_managed_kms_key(
352 mut self,
353 aws_kms_key_id: &str,
354 ) -> Self {
355 self.config.server_side_encryption = Some("aws:kms".to_string());
356 self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
357 self
358 }
359
360 pub fn server_side_encryption_with_s3_key(mut self) -> Self {
366 self.config.server_side_encryption = Some("AES256".to_string());
367 self
368 }
369
370 pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
376 self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
377 self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
378 self.config.server_side_encryption_customer_key_md5 =
379 Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
380 self
381 }
382
383 pub fn session_token(mut self, token: &str) -> Self {
389 if !token.is_empty() {
390 self.config.session_token = Some(token.to_string());
391 }
392 self
393 }
394
395 #[deprecated(note = "Please use `session_token` instead")]
397 pub fn security_token(self, token: &str) -> Self {
398 self.session_token(token)
399 }
400
401 pub fn disable_config_load(mut self) -> Self {
409 self.config.disable_config_load = true;
410 self
411 }
412
413 pub fn disable_ec2_metadata(mut self) -> Self {
418 self.config.disable_ec2_metadata = true;
419 self
420 }
421
422 pub fn allow_anonymous(mut self) -> Self {
425 self.config.allow_anonymous = true;
426 self
427 }
428
429 pub fn enable_virtual_host_style(mut self) -> Self {
435 self.config.enable_virtual_host_style = true;
436 self
437 }
438
439 pub fn disable_stat_with_override(mut self) -> Self {
443 self.config.disable_stat_with_override = true;
444 self
445 }
446
447 pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
452 self.customized_credential_load = Some(cred);
453 self
454 }
455
456 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
463 #[allow(deprecated)]
464 pub fn http_client(mut self, client: HttpClient) -> Self {
465 self.http_client = Some(client);
466 self
467 }
468
469 pub fn enable_versioning(mut self, enabled: bool) -> Self {
471 self.config.enable_versioning = enabled;
472
473 self
474 }
475
476 fn is_bucket_valid(&self) -> bool {
480 if self.config.bucket.is_empty() {
481 return false;
482 }
483 if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
487 return false;
488 }
489 true
490 }
491
492 fn build_endpoint(&self, region: &str) -> String {
494 let bucket = {
495 debug_assert!(self.is_bucket_valid(), "bucket must be valid");
496
497 self.config.bucket.as_str()
498 };
499
500 let mut endpoint = match &self.config.endpoint {
501 Some(endpoint) => {
502 if endpoint.starts_with("http") {
503 endpoint.to_string()
504 } else {
505 format!("https://{endpoint}")
507 }
508 }
509 None => "https://s3.amazonaws.com".to_string(),
510 };
511
512 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
514
515 if let Ok(url) = Url::from_str(&endpoint) {
517 endpoint = url.to_string().trim_end_matches('/').to_string();
519 }
520
521 endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
523 template.replace("{region}", region)
524 } else {
525 endpoint.to_string()
528 };
529
530 if self.config.enable_virtual_host_style {
532 endpoint = endpoint.replace("//", &format!("//{bucket}."))
533 } else {
534 write!(endpoint, "/{bucket}").expect("write into string must succeed");
535 };
536
537 endpoint
538 }
539
540 #[deprecated(
542 since = "0.52.0",
543 note = "Please use `delete_max_size` instead of `batch_max_operations`"
544 )]
545 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
546 self.config.delete_max_size = Some(batch_max_operations);
547
548 self
549 }
550
551 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
553 self.config.delete_max_size = Some(delete_max_size);
554
555 self
556 }
557
558 pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
564 self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
565
566 self
567 }
568
569 pub fn disable_write_with_if_match(mut self) -> Self {
571 self.config.disable_write_with_if_match = true;
572 self
573 }
574
575 pub fn enable_write_with_append(mut self) -> Self {
577 self.config.enable_write_with_append = true;
578 self
579 }
580
581 pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
617 let endpoint = endpoint.trim_end_matches('/');
619
620 let mut endpoint = if endpoint.starts_with("http") {
622 endpoint.to_string()
623 } else {
624 format!("https://{}", endpoint)
626 };
627
628 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
630 let url = format!("{endpoint}/{bucket}");
631
632 debug!("detect region with url: {url}");
633
634 if endpoint.ends_with("r2.cloudflarestorage.com") {
640 return Some("auto".to_string());
641 }
642
643 if let Some(v) = endpoint.strip_prefix("https://s3.") {
645 if let Some(region) = v.strip_suffix(".amazonaws.com") {
646 return Some(region.to_string());
647 }
648 }
649
650 if let Some(v) = endpoint.strip_prefix("https://") {
655 if let Some(region) = v.strip_suffix(".aliyuncs.com") {
656 return Some(region.to_string());
657 }
658
659 if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
660 return Some(region.to_string());
661 }
662 }
663
664 let req = http::Request::head(&url).body(Buffer::new()).ok()?;
666
667 let client = HttpClient::new().ok()?;
668 let res = client
669 .send(req)
670 .await
671 .map_err(|err| warn!("detect region failed for: {err:?}"))
672 .ok()?;
673
674 debug!(
675 "auto detect region got response: status {:?}, header: {:?}",
676 res.status(),
677 res.headers()
678 );
679
680 if let Some(header) = res.headers().get("x-amz-bucket-region") {
682 if let Ok(regin) = header.to_str() {
683 return Some(regin.to_string());
684 }
685 }
686
687 if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
690 return Some("us-east-1".to_string());
691 }
692
693 None
694 }
695}
696
697impl Builder for S3Builder {
698 const SCHEME: Scheme = Scheme::S3;
699 type Config = S3Config;
700
701 fn build(mut self) -> Result<impl Access> {
702 debug!("backend build started: {:?}", &self);
703
704 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
705 debug!("backend use root {}", &root);
706
707 let bucket = if self.is_bucket_valid() {
709 Ok(&self.config.bucket)
710 } else {
711 Err(
712 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
713 .with_context("service", Scheme::S3),
714 )
715 }?;
716 debug!("backend use bucket {}", &bucket);
717
718 let default_storage_class = match &self.config.default_storage_class {
719 None => None,
720 Some(v) => Some(
721 build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
722 ),
723 };
724
725 let server_side_encryption = match &self.config.server_side_encryption {
726 None => None,
727 Some(v) => Some(
728 build_header_value(v)
729 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
730 ),
731 };
732
733 let server_side_encryption_aws_kms_key_id =
734 match &self.config.server_side_encryption_aws_kms_key_id {
735 None => None,
736 Some(v) => Some(build_header_value(v).map_err(|err| {
737 err.with_context("key", "server_side_encryption_aws_kms_key_id")
738 })?),
739 };
740
741 let server_side_encryption_customer_algorithm =
742 match &self.config.server_side_encryption_customer_algorithm {
743 None => None,
744 Some(v) => Some(build_header_value(v).map_err(|err| {
745 err.with_context("key", "server_side_encryption_customer_algorithm")
746 })?),
747 };
748
749 let server_side_encryption_customer_key =
750 match &self.config.server_side_encryption_customer_key {
751 None => None,
752 Some(v) => Some(build_header_value(v).map_err(|err| {
753 err.with_context("key", "server_side_encryption_customer_key")
754 })?),
755 };
756
757 let server_side_encryption_customer_key_md5 =
758 match &self.config.server_side_encryption_customer_key_md5 {
759 None => None,
760 Some(v) => Some(build_header_value(v).map_err(|err| {
761 err.with_context("key", "server_side_encryption_customer_key_md5")
762 })?),
763 };
764
765 let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
766 Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
767 None => None,
768 v => {
769 return Err(Error::new(
770 ErrorKind::ConfigInvalid,
771 format!("{:?} is not a supported checksum_algorithm.", v),
772 ))
773 }
774 };
775
776 let mut cfg = AwsConfig::default();
778 if !self.config.disable_config_load {
779 #[cfg(not(target_arch = "wasm32"))]
780 {
781 cfg = cfg.from_profile();
782 cfg = cfg.from_env();
783 }
784 }
785
786 if let Some(ref v) = self.config.region {
787 cfg.region = Some(v.to_string());
788 }
789
790 if cfg.region.is_none() {
791 return Err(Error::new(
792 ErrorKind::ConfigInvalid,
793 "region is missing. Please find it by S3::detect_region() or set them in env.",
794 )
795 .with_operation("Builder::build")
796 .with_context("service", Scheme::S3));
797 }
798
799 let region = cfg.region.to_owned().unwrap();
800 debug!("backend use region: {region}");
801
802 self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
804
805 let endpoint = self.build_endpoint(®ion);
807 debug!("backend use endpoint: {endpoint}");
808
809 if let Some(v) = self.config.access_key_id {
811 cfg.access_key_id = Some(v)
812 }
813 if let Some(v) = self.config.secret_access_key {
814 cfg.secret_access_key = Some(v)
815 }
816 if let Some(v) = self.config.session_token {
817 cfg.session_token = Some(v)
818 }
819
820 let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
821 if let Some(v) = self.customized_credential_load {
823 loader = Some(v);
824 }
825
826 if let Some(role_arn) = self.config.role_arn {
828 let default_loader =
830 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
831
832 let mut assume_role_cfg = AwsConfig {
834 region: Some(region.clone()),
835 role_arn: Some(role_arn),
836 external_id: self.config.external_id.clone(),
837 sts_regional_endpoints: "regional".to_string(),
838 ..Default::default()
839 };
840
841 if let Some(name) = self.config.role_session_name {
843 assume_role_cfg.role_session_name = name;
844 }
845
846 let assume_role_loader = AwsAssumeRoleLoader::new(
847 GLOBAL_REQWEST_CLIENT.clone().clone(),
848 assume_role_cfg,
849 Box::new(default_loader),
850 )
851 .map_err(|err| {
852 Error::new(
853 ErrorKind::ConfigInvalid,
854 "The assume_role_loader is misconfigured",
855 )
856 .with_context("service", Scheme::S3)
857 .set_source(err)
858 })?;
859 loader = Some(Box::new(assume_role_loader));
860 }
861 let loader = match loader {
863 Some(v) => v,
864 None => {
865 let mut default_loader =
866 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
867 if self.config.disable_ec2_metadata {
868 default_loader = default_loader.with_disable_ec2_metadata();
869 }
870
871 Box::new(default_loader)
872 }
873 };
874
875 let signer = AwsV4Signer::new("s3", ®ion);
876
877 let delete_max_size = self
878 .config
879 .delete_max_size
880 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
881
882 Ok(S3Backend {
883 core: Arc::new(S3Core {
884 info: {
885 let am = AccessorInfo::default();
886 am.set_scheme(Scheme::S3)
887 .set_root(&root)
888 .set_name(bucket)
889 .set_native_capability(Capability {
890 stat: true,
891 stat_has_content_encoding: true,
892 stat_with_if_match: true,
893 stat_with_if_none_match: true,
894 stat_with_if_modified_since: true,
895 stat_with_if_unmodified_since: true,
896 stat_with_override_cache_control: !self
897 .config
898 .disable_stat_with_override,
899 stat_with_override_content_disposition: !self
900 .config
901 .disable_stat_with_override,
902 stat_with_override_content_type: !self
903 .config
904 .disable_stat_with_override,
905 stat_with_version: self.config.enable_versioning,
906 stat_has_cache_control: true,
907 stat_has_content_length: true,
908 stat_has_content_type: true,
909 stat_has_content_range: true,
910 stat_has_etag: true,
911 stat_has_content_md5: true,
912 stat_has_last_modified: true,
913 stat_has_content_disposition: true,
914 stat_has_user_metadata: true,
915 stat_has_version: true,
916
917 read: true,
918 read_with_if_match: true,
919 read_with_if_none_match: true,
920 read_with_if_modified_since: true,
921 read_with_if_unmodified_since: true,
922 read_with_override_cache_control: true,
923 read_with_override_content_disposition: true,
924 read_with_override_content_type: true,
925 read_with_version: self.config.enable_versioning,
926
927 write: true,
928 write_can_empty: true,
929 write_can_multi: true,
930 write_can_append: self.config.enable_write_with_append,
931
932 write_with_cache_control: true,
933 write_with_content_type: true,
934 write_with_content_encoding: true,
935 write_with_if_match: !self.config.disable_write_with_if_match,
936 write_with_if_not_exists: true,
937 write_with_user_metadata: true,
938
939 write_multi_min_size: Some(5 * 1024 * 1024),
943 write_multi_max_size: if cfg!(target_pointer_width = "64") {
947 Some(5 * 1024 * 1024 * 1024)
948 } else {
949 Some(usize::MAX)
950 },
951
952 delete: true,
953 delete_max_size: Some(delete_max_size),
954 delete_with_version: self.config.enable_versioning,
955
956 copy: true,
957
958 list: true,
959 list_with_limit: true,
960 list_with_start_after: true,
961 list_with_recursive: true,
962 list_with_versions: self.config.enable_versioning,
963 list_with_deleted: self.config.enable_versioning,
964 list_has_etag: true,
965 list_has_content_md5: true,
966 list_has_content_length: true,
967 list_has_last_modified: true,
968
969 presign: true,
970 presign_stat: true,
971 presign_read: true,
972 presign_write: true,
973
974 shared: true,
975
976 ..Default::default()
977 });
978
979 #[allow(deprecated)]
981 if let Some(client) = self.http_client {
982 am.update_http_client(|_| client);
983 }
984
985 am.into()
986 },
987 bucket: bucket.to_string(),
988 endpoint,
989 root,
990 server_side_encryption,
991 server_side_encryption_aws_kms_key_id,
992 server_side_encryption_customer_algorithm,
993 server_side_encryption_customer_key,
994 server_side_encryption_customer_key_md5,
995 default_storage_class,
996 allow_anonymous: self.config.allow_anonymous,
997 signer,
998 loader,
999 credential_loaded: AtomicBool::new(false),
1000 checksum_algorithm,
1001 }),
1002 })
1003 }
1004}
1005
1006#[derive(Debug, Clone)]
1008pub struct S3Backend {
1009 core: Arc<S3Core>,
1010}
1011
1012impl Access for S3Backend {
1013 type Reader = HttpBody;
1014 type Writer = S3Writers;
1015 type Lister = S3Listers;
1016 type Deleter = oio::BatchDeleter<S3Deleter>;
1017 type BlockingReader = ();
1018 type BlockingWriter = ();
1019 type BlockingLister = ();
1020 type BlockingDeleter = ();
1021
1022 fn info(&self) -> Arc<AccessorInfo> {
1023 self.core.info.clone()
1024 }
1025
1026 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1027 let resp = self.core.s3_head_object(path, args).await?;
1028
1029 let status = resp.status();
1030
1031 match status {
1032 StatusCode::OK => {
1033 let headers = resp.headers();
1034 let mut meta = parse_into_metadata(path, headers)?;
1035
1036 let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1037 if !user_meta.is_empty() {
1038 meta.with_user_metadata(user_meta);
1039 }
1040
1041 if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1042 meta.set_version(v);
1043 }
1044
1045 Ok(RpStat::new(meta))
1046 }
1047 _ => Err(parse_error(resp)),
1048 }
1049 }
1050
1051 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1052 let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1053
1054 let status = resp.status();
1055 match status {
1056 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1057 Ok((RpRead::default(), resp.into_body()))
1058 }
1059 _ => {
1060 let (part, mut body) = resp.into_parts();
1061 let buf = body.to_buffer().await?;
1062 Err(parse_error(Response::from_parts(part, buf)))
1063 }
1064 }
1065 }
1066
1067 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1068 let writer = S3Writer::new(self.core.clone(), path, args.clone());
1069
1070 let w = if args.append() {
1071 S3Writers::Two(oio::AppendWriter::new(writer))
1072 } else {
1073 S3Writers::One(oio::MultipartWriter::new(
1074 self.core.info.clone(),
1075 writer,
1076 args.concurrent(),
1077 ))
1078 };
1079
1080 Ok((RpWrite::default(), w))
1081 }
1082
1083 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1084 Ok((
1085 RpDelete::default(),
1086 oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1087 ))
1088 }
1089
1090 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1091 let l = if args.versions() || args.deleted() {
1092 TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
1093 self.core.clone(),
1094 path,
1095 args,
1096 )))
1097 } else {
1098 TwoWays::One(PageLister::new(S3Lister::new(
1099 self.core.clone(),
1100 path,
1101 args,
1102 )))
1103 };
1104
1105 Ok((RpList::default(), l))
1106 }
1107
1108 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1109 let resp = self.core.s3_copy_object(from, to).await?;
1110
1111 let status = resp.status();
1112
1113 match status {
1114 StatusCode::OK => Ok(RpCopy::default()),
1115 _ => Err(parse_error(resp)),
1116 }
1117 }
1118
1119 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1120 let (expire, op) = args.into_parts();
1121 let req = match op {
1123 PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1124 PresignOperation::Read(v) => {
1125 self.core
1126 .s3_get_object_request(path, BytesRange::default(), &v)
1127 }
1128 PresignOperation::Write(_) => {
1129 self.core
1130 .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1131 }
1132 PresignOperation::Delete(_) => Err(Error::new(
1133 ErrorKind::Unsupported,
1134 "operation is not supported",
1135 )),
1136 };
1137 let mut req = req?;
1138
1139 self.core.sign_query(&mut req, expire).await?;
1140
1141 let (parts, _) = req.into_parts();
1143
1144 Ok(RpPresign::new(PresignedRequest::new(
1145 parts.method,
1146 parts.uri,
1147 parts.headers,
1148 )))
1149 }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::*;
1155
1156 #[test]
1157 fn test_is_valid_bucket() {
1158 let bucket_cases = vec![
1159 ("", false, false),
1160 ("test", false, true),
1161 ("test.xyz", false, true),
1162 ("", true, false),
1163 ("test", true, true),
1164 ("test.xyz", true, false),
1165 ];
1166
1167 for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1168 let mut b = S3Builder::default();
1169 b = b.bucket(bucket);
1170 if enable_virtual_host_style {
1171 b = b.enable_virtual_host_style();
1172 }
1173 assert_eq!(b.is_bucket_valid(), expected)
1174 }
1175 }
1176
1177 #[test]
1178 fn test_build_endpoint() {
1179 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1180
1181 let endpoint_cases = vec![
1182 Some("s3.amazonaws.com"),
1183 Some("https://s3.amazonaws.com"),
1184 Some("https://s3.us-east-2.amazonaws.com"),
1185 None,
1186 ];
1187
1188 for endpoint in &endpoint_cases {
1189 let mut b = S3Builder::default().bucket("test");
1190 if let Some(endpoint) = endpoint {
1191 b = b.endpoint(endpoint);
1192 }
1193
1194 let endpoint = b.build_endpoint("us-east-2");
1195 assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1196 }
1197
1198 for endpoint in &endpoint_cases {
1199 let mut b = S3Builder::default()
1200 .bucket("test")
1201 .enable_virtual_host_style();
1202 if let Some(endpoint) = endpoint {
1203 b = b.endpoint(endpoint);
1204 }
1205
1206 let endpoint = b.build_endpoint("us-east-2");
1207 assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1208 }
1209 }
1210
1211 #[tokio::test]
1212 async fn test_detect_region() {
1213 let cases = vec![
1214 (
1215 "aws s3 without region in endpoint",
1216 "https://s3.amazonaws.com",
1217 "example",
1218 Some("us-east-1"),
1219 ),
1220 (
1221 "aws s3 with region in endpoint",
1222 "https://s3.us-east-1.amazonaws.com",
1223 "example",
1224 Some("us-east-1"),
1225 ),
1226 (
1227 "oss with public endpoint",
1228 "https://oss-ap-southeast-1.aliyuncs.com",
1229 "example",
1230 Some("oss-ap-southeast-1"),
1231 ),
1232 (
1233 "oss with internal endpoint",
1234 "https://oss-cn-hangzhou-internal.aliyuncs.com",
1235 "example",
1236 Some("oss-cn-hangzhou-internal"),
1237 ),
1238 (
1239 "r2",
1240 "https://abc.xxxxx.r2.cloudflarestorage.com",
1241 "example",
1242 Some("auto"),
1243 ),
1244 (
1245 "invalid service",
1246 "https://opendal.apache.org",
1247 "example",
1248 None,
1249 ),
1250 ];
1251
1252 for (name, endpoint, bucket, expected) in cases {
1253 let region = S3Builder::detect_region(endpoint, bucket).await;
1254 assert_eq!(region.as_deref(), expected, "{}", name);
1255 }
1256 }
1257}