1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::Write;
22use std::str::FromStr;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::sync::LazyLock;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use constants::X_AMZ_META_PREFIX;
30use constants::X_AMZ_VERSION_ID;
31use http::Response;
32use http::StatusCode;
33use log::debug;
34use log::warn;
35use md5::Digest;
36use md5::Md5;
37use reqsign::AwsAssumeRoleLoader;
38use reqsign::AwsConfig;
39use reqsign::AwsCredentialLoad;
40use reqsign::AwsDefaultLoader;
41use reqsign::AwsV4Signer;
42use reqwest::Url;
43
44use super::core::*;
45use super::delete::S3Deleter;
46use super::error::parse_error;
47use super::lister::S3ListerV1;
48use super::lister::S3ListerV2;
49use super::lister::S3Listers;
50use super::lister::S3ObjectVersionsLister;
51use super::writer::S3Writer;
52use super::writer::S3Writers;
53use crate::raw::oio::PageLister;
54use crate::raw::*;
55use crate::services::S3Config;
56use crate::*;
57
58static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
60 let mut m = HashMap::new();
61 m.insert(
63 "https://s3.amazonaws.com",
64 "https://s3.{region}.amazonaws.com",
65 );
66 m
67});
68
69const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
70
71impl Configurator for S3Config {
72 type Builder = S3Builder;
73
74 #[allow(deprecated)]
75 fn into_builder(self) -> Self::Builder {
76 S3Builder {
77 config: self,
78 customized_credential_load: None,
79
80 http_client: None,
81 }
82 }
83}
84
85#[doc = include_str!("docs.md")]
88#[doc = include_str!("compatible_services.md")]
89#[derive(Default)]
90pub struct S3Builder {
91 config: S3Config,
92
93 customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
94
95 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
96 http_client: Option<HttpClient>,
97}
98
99impl Debug for S3Builder {
100 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
101 let mut d = f.debug_struct("S3Builder");
102
103 d.field("config", &self.config);
104 d.finish_non_exhaustive()
105 }
106}
107
108impl S3Builder {
109 pub fn root(mut self, root: &str) -> Self {
113 self.config.root = if root.is_empty() {
114 None
115 } else {
116 Some(root.to_string())
117 };
118
119 self
120 }
121
122 pub fn bucket(mut self, bucket: &str) -> Self {
124 self.config.bucket = bucket.to_string();
125
126 self
127 }
128
129 pub fn endpoint(mut self, endpoint: &str) -> Self {
142 if !endpoint.is_empty() {
143 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
145 }
146
147 self
148 }
149
150 pub fn region(mut self, region: &str) -> Self {
157 if !region.is_empty() {
158 self.config.region = Some(region.to_string())
159 }
160
161 self
162 }
163
164 pub fn access_key_id(mut self, v: &str) -> Self {
169 if !v.is_empty() {
170 self.config.access_key_id = Some(v.to_string())
171 }
172
173 self
174 }
175
176 pub fn secret_access_key(mut self, v: &str) -> Self {
181 if !v.is_empty() {
182 self.config.secret_access_key = Some(v.to_string())
183 }
184
185 self
186 }
187
188 pub fn role_arn(mut self, v: &str) -> Self {
193 if !v.is_empty() {
194 self.config.role_arn = Some(v.to_string())
195 }
196
197 self
198 }
199
200 pub fn external_id(mut self, v: &str) -> Self {
202 if !v.is_empty() {
203 self.config.external_id = Some(v.to_string())
204 }
205
206 self
207 }
208
209 pub fn role_session_name(mut self, v: &str) -> Self {
211 if !v.is_empty() {
212 self.config.role_session_name = Some(v.to_string())
213 }
214
215 self
216 }
217
218 pub fn default_storage_class(mut self, v: &str) -> Self {
231 if !v.is_empty() {
232 self.config.default_storage_class = Some(v.to_string())
233 }
234
235 self
236 }
237
238 pub fn server_side_encryption(mut self, v: &str) -> Self {
249 if !v.is_empty() {
250 self.config.server_side_encryption = Some(v.to_string())
251 }
252
253 self
254 }
255
256 pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
273 if !v.is_empty() {
274 self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
275 }
276
277 self
278 }
279
280 pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
291 if !v.is_empty() {
292 self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
293 }
294
295 self
296 }
297
298 pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
312 if !v.is_empty() {
313 self.config.server_side_encryption_customer_key = Some(v.to_string())
314 }
315
316 self
317 }
318
319 pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
332 if !v.is_empty() {
333 self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
334 }
335
336 self
337 }
338
339 pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
345 self.config.server_side_encryption = Some("aws:kms".to_string());
346 self
347 }
348
349 pub fn server_side_encryption_with_customer_managed_kms_key(
355 mut self,
356 aws_kms_key_id: &str,
357 ) -> Self {
358 self.config.server_side_encryption = Some("aws:kms".to_string());
359 self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
360 self
361 }
362
363 pub fn server_side_encryption_with_s3_key(mut self) -> Self {
369 self.config.server_side_encryption = Some("AES256".to_string());
370 self
371 }
372
373 pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
379 self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
380 self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
381 self.config.server_side_encryption_customer_key_md5 =
382 Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
383 self
384 }
385
386 pub fn session_token(mut self, token: &str) -> Self {
392 if !token.is_empty() {
393 self.config.session_token = Some(token.to_string());
394 }
395 self
396 }
397
398 #[deprecated(note = "Please use `session_token` instead")]
400 pub fn security_token(self, token: &str) -> Self {
401 self.session_token(token)
402 }
403
404 pub fn disable_config_load(mut self) -> Self {
412 self.config.disable_config_load = true;
413 self
414 }
415
416 pub fn disable_list_objects_v2(mut self) -> Self {
422 self.config.disable_list_objects_v2 = true;
423 self
424 }
425
426 pub fn enable_request_payer(mut self) -> Self {
430 self.config.enable_request_payer = true;
431 self
432 }
433
434 pub fn disable_ec2_metadata(mut self) -> Self {
439 self.config.disable_ec2_metadata = true;
440 self
441 }
442
443 pub fn allow_anonymous(mut self) -> Self {
446 self.config.allow_anonymous = true;
447 self
448 }
449
450 pub fn enable_virtual_host_style(mut self) -> Self {
456 self.config.enable_virtual_host_style = true;
457 self
458 }
459
460 pub fn disable_stat_with_override(mut self) -> Self {
464 self.config.disable_stat_with_override = true;
465 self
466 }
467
468 pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
473 self.customized_credential_load = Some(cred);
474 self
475 }
476
477 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
484 #[allow(deprecated)]
485 pub fn http_client(mut self, client: HttpClient) -> Self {
486 self.http_client = Some(client);
487 self
488 }
489
490 pub fn enable_versioning(mut self, enabled: bool) -> Self {
492 self.config.enable_versioning = enabled;
493
494 self
495 }
496
497 fn is_bucket_valid(&self) -> bool {
501 if self.config.bucket.is_empty() {
502 return false;
503 }
504 if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
508 return false;
509 }
510 true
511 }
512
513 fn build_endpoint(&self, region: &str) -> String {
515 let bucket = {
516 debug_assert!(self.is_bucket_valid(), "bucket must be valid");
517
518 self.config.bucket.as_str()
519 };
520
521 let mut endpoint = match &self.config.endpoint {
522 Some(endpoint) => {
523 if endpoint.starts_with("http") {
524 endpoint.to_string()
525 } else {
526 format!("https://{endpoint}")
528 }
529 }
530 None => "https://s3.amazonaws.com".to_string(),
531 };
532
533 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
535
536 if let Ok(url) = Url::from_str(&endpoint) {
538 endpoint = url.to_string().trim_end_matches('/').to_string();
540 }
541
542 endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
544 template.replace("{region}", region)
545 } else {
546 endpoint.to_string()
549 };
550
551 if self.config.enable_virtual_host_style {
553 endpoint = endpoint.replace("//", &format!("//{bucket}."))
554 } else {
555 write!(endpoint, "/{bucket}").expect("write into string must succeed");
556 };
557
558 endpoint
559 }
560
561 #[deprecated(
563 since = "0.52.0",
564 note = "Please use `delete_max_size` instead of `batch_max_operations`"
565 )]
566 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
567 self.config.delete_max_size = Some(batch_max_operations);
568
569 self
570 }
571
572 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
574 self.config.delete_max_size = Some(delete_max_size);
575
576 self
577 }
578
579 pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
585 self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
586
587 self
588 }
589
590 pub fn disable_write_with_if_match(mut self) -> Self {
592 self.config.disable_write_with_if_match = true;
593 self
594 }
595
596 pub fn enable_write_with_append(mut self) -> Self {
598 self.config.enable_write_with_append = true;
599 self
600 }
601
602 pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
638 let endpoint = endpoint.trim_end_matches('/');
640
641 let mut endpoint = if endpoint.starts_with("http") {
643 endpoint.to_string()
644 } else {
645 format!("https://{}", endpoint)
647 };
648
649 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
651 let url = format!("{endpoint}/{bucket}");
652
653 debug!("detect region with url: {url}");
654
655 if endpoint.ends_with("r2.cloudflarestorage.com") {
661 return Some("auto".to_string());
662 }
663
664 if let Some(v) = endpoint.strip_prefix("https://s3.") {
666 if let Some(region) = v.strip_suffix(".amazonaws.com") {
667 return Some(region.to_string());
668 }
669 }
670
671 if let Some(v) = endpoint.strip_prefix("https://") {
676 if let Some(region) = v.strip_suffix(".aliyuncs.com") {
677 return Some(region.to_string());
678 }
679
680 if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
681 return Some(region.to_string());
682 }
683 }
684
685 let req = http::Request::head(&url).body(Buffer::new()).ok()?;
687
688 let client = HttpClient::new().ok()?;
689 let res = client
690 .send(req)
691 .await
692 .map_err(|err| warn!("detect region failed for: {err:?}"))
693 .ok()?;
694
695 debug!(
696 "auto detect region got response: status {:?}, header: {:?}",
697 res.status(),
698 res.headers()
699 );
700
701 if let Some(header) = res.headers().get("x-amz-bucket-region") {
703 if let Ok(regin) = header.to_str() {
704 return Some(regin.to_string());
705 }
706 }
707
708 if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
711 return Some("us-east-1".to_string());
712 }
713
714 None
715 }
716}
717
718impl Builder for S3Builder {
719 const SCHEME: Scheme = Scheme::S3;
720 type Config = S3Config;
721
722 fn build(mut self) -> Result<impl Access> {
723 debug!("backend build started: {:?}", &self);
724
725 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
726 debug!("backend use root {}", &root);
727
728 let bucket = if self.is_bucket_valid() {
730 Ok(&self.config.bucket)
731 } else {
732 Err(
733 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
734 .with_context("service", Scheme::S3),
735 )
736 }?;
737 debug!("backend use bucket {}", &bucket);
738
739 let default_storage_class = match &self.config.default_storage_class {
740 None => None,
741 Some(v) => Some(
742 build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
743 ),
744 };
745
746 let server_side_encryption = match &self.config.server_side_encryption {
747 None => None,
748 Some(v) => Some(
749 build_header_value(v)
750 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
751 ),
752 };
753
754 let server_side_encryption_aws_kms_key_id =
755 match &self.config.server_side_encryption_aws_kms_key_id {
756 None => None,
757 Some(v) => Some(build_header_value(v).map_err(|err| {
758 err.with_context("key", "server_side_encryption_aws_kms_key_id")
759 })?),
760 };
761
762 let server_side_encryption_customer_algorithm =
763 match &self.config.server_side_encryption_customer_algorithm {
764 None => None,
765 Some(v) => Some(build_header_value(v).map_err(|err| {
766 err.with_context("key", "server_side_encryption_customer_algorithm")
767 })?),
768 };
769
770 let server_side_encryption_customer_key =
771 match &self.config.server_side_encryption_customer_key {
772 None => None,
773 Some(v) => Some(build_header_value(v).map_err(|err| {
774 err.with_context("key", "server_side_encryption_customer_key")
775 })?),
776 };
777
778 let server_side_encryption_customer_key_md5 =
779 match &self.config.server_side_encryption_customer_key_md5 {
780 None => None,
781 Some(v) => Some(build_header_value(v).map_err(|err| {
782 err.with_context("key", "server_side_encryption_customer_key_md5")
783 })?),
784 };
785
786 let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
787 Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
788 None => None,
789 v => {
790 return Err(Error::new(
791 ErrorKind::ConfigInvalid,
792 format!("{:?} is not a supported checksum_algorithm.", v),
793 ))
794 }
795 };
796
797 let mut cfg = AwsConfig::default();
799 if !self.config.disable_config_load {
800 #[cfg(not(target_arch = "wasm32"))]
801 {
802 cfg = cfg.from_profile();
803 cfg = cfg.from_env();
804 }
805 }
806
807 if let Some(ref v) = self.config.region {
808 cfg.region = Some(v.to_string());
809 }
810
811 if cfg.region.is_none() {
812 return Err(Error::new(
813 ErrorKind::ConfigInvalid,
814 "region is missing. Please find it by S3::detect_region() or set them in env.",
815 )
816 .with_operation("Builder::build")
817 .with_context("service", Scheme::S3));
818 }
819
820 let region = cfg.region.to_owned().unwrap();
821 debug!("backend use region: {region}");
822
823 self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
825
826 let endpoint = self.build_endpoint(®ion);
828 debug!("backend use endpoint: {endpoint}");
829
830 if let Some(v) = self.config.access_key_id {
832 cfg.access_key_id = Some(v)
833 }
834 if let Some(v) = self.config.secret_access_key {
835 cfg.secret_access_key = Some(v)
836 }
837 if let Some(v) = self.config.session_token {
838 cfg.session_token = Some(v)
839 }
840
841 let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
842 if let Some(v) = self.customized_credential_load {
844 loader = Some(v);
845 }
846
847 if let Some(role_arn) = self.config.role_arn {
849 let default_loader =
851 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
852
853 let mut assume_role_cfg = AwsConfig {
855 region: Some(region.clone()),
856 role_arn: Some(role_arn),
857 external_id: self.config.external_id.clone(),
858 sts_regional_endpoints: "regional".to_string(),
859 ..Default::default()
860 };
861
862 if let Some(name) = self.config.role_session_name {
864 assume_role_cfg.role_session_name = name;
865 }
866
867 let assume_role_loader = AwsAssumeRoleLoader::new(
868 GLOBAL_REQWEST_CLIENT.clone().clone(),
869 assume_role_cfg,
870 Box::new(default_loader),
871 )
872 .map_err(|err| {
873 Error::new(
874 ErrorKind::ConfigInvalid,
875 "The assume_role_loader is misconfigured",
876 )
877 .with_context("service", Scheme::S3)
878 .set_source(err)
879 })?;
880 loader = Some(Box::new(assume_role_loader));
881 }
882 let loader = match loader {
884 Some(v) => v,
885 None => {
886 let mut default_loader =
887 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
888 if self.config.disable_ec2_metadata {
889 default_loader = default_loader.with_disable_ec2_metadata();
890 }
891
892 Box::new(default_loader)
893 }
894 };
895
896 let signer = AwsV4Signer::new("s3", ®ion);
897
898 let delete_max_size = self
899 .config
900 .delete_max_size
901 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
902
903 Ok(S3Backend {
904 core: Arc::new(S3Core {
905 info: {
906 let am = AccessorInfo::default();
907 am.set_scheme(Scheme::S3)
908 .set_root(&root)
909 .set_name(bucket)
910 .set_native_capability(Capability {
911 stat: true,
912 stat_has_content_encoding: true,
913 stat_with_if_match: true,
914 stat_with_if_none_match: true,
915 stat_with_if_modified_since: true,
916 stat_with_if_unmodified_since: true,
917 stat_with_override_cache_control: !self
918 .config
919 .disable_stat_with_override,
920 stat_with_override_content_disposition: !self
921 .config
922 .disable_stat_with_override,
923 stat_with_override_content_type: !self
924 .config
925 .disable_stat_with_override,
926 stat_with_version: self.config.enable_versioning,
927 stat_has_cache_control: true,
928 stat_has_content_length: true,
929 stat_has_content_type: true,
930 stat_has_content_range: true,
931 stat_has_etag: true,
932 stat_has_content_md5: true,
933 stat_has_last_modified: true,
934 stat_has_content_disposition: true,
935 stat_has_user_metadata: true,
936 stat_has_version: true,
937
938 read: true,
939 read_with_if_match: true,
940 read_with_if_none_match: true,
941 read_with_if_modified_since: true,
942 read_with_if_unmodified_since: true,
943 read_with_override_cache_control: true,
944 read_with_override_content_disposition: true,
945 read_with_override_content_type: true,
946 read_with_version: self.config.enable_versioning,
947
948 write: true,
949 write_can_empty: true,
950 write_can_multi: true,
951 write_can_append: self.config.enable_write_with_append,
952
953 write_with_cache_control: true,
954 write_with_content_type: true,
955 write_with_content_encoding: true,
956 write_with_if_match: !self.config.disable_write_with_if_match,
957 write_with_if_not_exists: true,
958 write_with_user_metadata: true,
959
960 write_multi_min_size: Some(5 * 1024 * 1024),
964 write_multi_max_size: if cfg!(target_pointer_width = "64") {
968 Some(5 * 1024 * 1024 * 1024)
969 } else {
970 Some(usize::MAX)
971 },
972
973 delete: true,
974 delete_max_size: Some(delete_max_size),
975 delete_with_version: self.config.enable_versioning,
976
977 copy: true,
978
979 list: true,
980 list_with_limit: true,
981 list_with_start_after: true,
982 list_with_recursive: true,
983 list_with_versions: self.config.enable_versioning,
984 list_with_deleted: self.config.enable_versioning,
985 list_has_etag: true,
986 list_has_content_md5: true,
987 list_has_content_length: true,
988 list_has_last_modified: true,
989
990 presign: true,
991 presign_stat: true,
992 presign_read: true,
993 presign_write: true,
994
995 shared: true,
996
997 ..Default::default()
998 });
999
1000 #[allow(deprecated)]
1002 if let Some(client) = self.http_client {
1003 am.update_http_client(|_| client);
1004 }
1005
1006 am.into()
1007 },
1008 bucket: bucket.to_string(),
1009 endpoint,
1010 root,
1011 server_side_encryption,
1012 server_side_encryption_aws_kms_key_id,
1013 server_side_encryption_customer_algorithm,
1014 server_side_encryption_customer_key,
1015 server_side_encryption_customer_key_md5,
1016 default_storage_class,
1017 allow_anonymous: self.config.allow_anonymous,
1018 disable_list_objects_v2: self.config.disable_list_objects_v2,
1019 enable_request_payer: self.config.enable_request_payer,
1020 signer,
1021 loader,
1022 credential_loaded: AtomicBool::new(false),
1023 checksum_algorithm,
1024 }),
1025 })
1026 }
1027}
1028
1029#[derive(Debug, Clone)]
1031pub struct S3Backend {
1032 core: Arc<S3Core>,
1033}
1034
1035impl Access for S3Backend {
1036 type Reader = HttpBody;
1037 type Writer = S3Writers;
1038 type Lister = S3Listers;
1039 type Deleter = oio::BatchDeleter<S3Deleter>;
1040
1041 fn info(&self) -> Arc<AccessorInfo> {
1042 self.core.info.clone()
1043 }
1044
1045 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1046 let resp = self.core.s3_head_object(path, args).await?;
1047
1048 let status = resp.status();
1049
1050 match status {
1051 StatusCode::OK => {
1052 let headers = resp.headers();
1053 let mut meta = parse_into_metadata(path, headers)?;
1054
1055 let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1056 if !user_meta.is_empty() {
1057 meta = meta.with_user_metadata(user_meta);
1058 }
1059
1060 if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1061 meta.set_version(v);
1062 }
1063
1064 Ok(RpStat::new(meta))
1065 }
1066 _ => Err(parse_error(resp)),
1067 }
1068 }
1069
1070 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1071 let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1072
1073 let status = resp.status();
1074 match status {
1075 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1076 Ok((RpRead::default(), resp.into_body()))
1077 }
1078 _ => {
1079 let (part, mut body) = resp.into_parts();
1080 let buf = body.to_buffer().await?;
1081 Err(parse_error(Response::from_parts(part, buf)))
1082 }
1083 }
1084 }
1085
1086 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1087 let writer = S3Writer::new(self.core.clone(), path, args.clone());
1088
1089 let w = if args.append() {
1090 S3Writers::Two(oio::AppendWriter::new(writer))
1091 } else {
1092 S3Writers::One(oio::MultipartWriter::new(
1093 self.core.info.clone(),
1094 writer,
1095 args.concurrent(),
1096 ))
1097 };
1098
1099 Ok((RpWrite::default(), w))
1100 }
1101
1102 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1103 Ok((
1104 RpDelete::default(),
1105 oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1106 ))
1107 }
1108
1109 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1110 let l = if args.versions() || args.deleted() {
1111 ThreeWays::Three(PageLister::new(S3ObjectVersionsLister::new(
1112 self.core.clone(),
1113 path,
1114 args,
1115 )))
1116 } else if self.core.disable_list_objects_v2 {
1117 ThreeWays::One(PageLister::new(S3ListerV1::new(
1118 self.core.clone(),
1119 path,
1120 args,
1121 )))
1122 } else {
1123 ThreeWays::Two(PageLister::new(S3ListerV2::new(
1124 self.core.clone(),
1125 path,
1126 args,
1127 )))
1128 };
1129
1130 Ok((RpList::default(), l))
1131 }
1132
1133 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1134 let resp = self.core.s3_copy_object(from, to).await?;
1135
1136 let status = resp.status();
1137
1138 match status {
1139 StatusCode::OK => Ok(RpCopy::default()),
1140 _ => Err(parse_error(resp)),
1141 }
1142 }
1143
1144 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1145 let (expire, op) = args.into_parts();
1146 let req = match op {
1148 PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1149 PresignOperation::Read(v) => {
1150 self.core
1151 .s3_get_object_request(path, BytesRange::default(), &v)
1152 }
1153 PresignOperation::Write(_) => {
1154 self.core
1155 .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1156 }
1157 PresignOperation::Delete(_) => Err(Error::new(
1158 ErrorKind::Unsupported,
1159 "operation is not supported",
1160 )),
1161 };
1162 let mut req = req?;
1163
1164 self.core.sign_query(&mut req, expire).await?;
1165
1166 let (parts, _) = req.into_parts();
1168
1169 Ok(RpPresign::new(PresignedRequest::new(
1170 parts.method,
1171 parts.uri,
1172 parts.headers,
1173 )))
1174 }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::*;
1180
1181 #[test]
1182 fn test_is_valid_bucket() {
1183 let bucket_cases = vec![
1184 ("", false, false),
1185 ("test", false, true),
1186 ("test.xyz", false, true),
1187 ("", true, false),
1188 ("test", true, true),
1189 ("test.xyz", true, false),
1190 ];
1191
1192 for (bucket, enable_virtual_host_style, expected) in bucket_cases {
1193 let mut b = S3Builder::default();
1194 b = b.bucket(bucket);
1195 if enable_virtual_host_style {
1196 b = b.enable_virtual_host_style();
1197 }
1198 assert_eq!(b.is_bucket_valid(), expected)
1199 }
1200 }
1201
1202 #[test]
1203 fn test_build_endpoint() {
1204 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1205
1206 let endpoint_cases = vec![
1207 Some("s3.amazonaws.com"),
1208 Some("https://s3.amazonaws.com"),
1209 Some("https://s3.us-east-2.amazonaws.com"),
1210 None,
1211 ];
1212
1213 for endpoint in &endpoint_cases {
1214 let mut b = S3Builder::default().bucket("test");
1215 if let Some(endpoint) = endpoint {
1216 b = b.endpoint(endpoint);
1217 }
1218
1219 let endpoint = b.build_endpoint("us-east-2");
1220 assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test");
1221 }
1222
1223 for endpoint in &endpoint_cases {
1224 let mut b = S3Builder::default()
1225 .bucket("test")
1226 .enable_virtual_host_style();
1227 if let Some(endpoint) = endpoint {
1228 b = b.endpoint(endpoint);
1229 }
1230
1231 let endpoint = b.build_endpoint("us-east-2");
1232 assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com");
1233 }
1234 }
1235
1236 #[tokio::test]
1237 async fn test_detect_region() {
1238 let cases = vec![
1239 (
1240 "aws s3 without region in endpoint",
1241 "https://s3.amazonaws.com",
1242 "example",
1243 Some("us-east-1"),
1244 ),
1245 (
1246 "aws s3 with region in endpoint",
1247 "https://s3.us-east-1.amazonaws.com",
1248 "example",
1249 Some("us-east-1"),
1250 ),
1251 (
1252 "oss with public endpoint",
1253 "https://oss-ap-southeast-1.aliyuncs.com",
1254 "example",
1255 Some("oss-ap-southeast-1"),
1256 ),
1257 (
1258 "oss with internal endpoint",
1259 "https://oss-cn-hangzhou-internal.aliyuncs.com",
1260 "example",
1261 Some("oss-cn-hangzhou-internal"),
1262 ),
1263 (
1264 "r2",
1265 "https://abc.xxxxx.r2.cloudflarestorage.com",
1266 "example",
1267 Some("auto"),
1268 ),
1269 (
1270 "invalid service",
1271 "https://opendal.apache.org",
1272 "example",
1273 None,
1274 ),
1275 ];
1276
1277 for (name, endpoint, bucket, expected) in cases {
1278 let region = S3Builder::detect_region(endpoint, bucket).await;
1279 assert_eq!(region.as_deref(), expected, "{}", name);
1280 }
1281 }
1282}