opendal/services/s3/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::fmt::Write;
23use std::sync::atomic;
24use std::sync::atomic::AtomicBool;
25use std::sync::Arc;
26use std::time::Duration;
27
28use base64::prelude::BASE64_STANDARD;
29use base64::Engine;
30use bytes::Bytes;
31use constants::X_AMZ_META_PREFIX;
32use http::header::HeaderName;
33use http::header::CACHE_CONTROL;
34use http::header::CONTENT_DISPOSITION;
35use http::header::CONTENT_ENCODING;
36use http::header::CONTENT_LENGTH;
37use http::header::CONTENT_TYPE;
38use http::header::HOST;
39use http::header::IF_MATCH;
40use http::header::IF_MODIFIED_SINCE;
41use http::header::IF_NONE_MATCH;
42use http::header::IF_UNMODIFIED_SINCE;
43use http::HeaderValue;
44use http::Request;
45use http::Response;
46use reqsign::AwsCredential;
47use reqsign::AwsCredentialLoad;
48use reqsign::AwsV4Signer;
49use serde::Deserialize;
50use serde::Serialize;
51
52use crate::raw::*;
53use crate::*;
54
55pub mod constants {
56    pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
57
58    pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption";
59    pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester");
60    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
61        "x-amz-server-side-encryption-customer-algorithm";
62    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
63        "x-amz-server-side-encryption-customer-key";
64    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
65        "x-amz-server-side-encryption-customer-key-md5";
66    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str =
67        "x-amz-server-side-encryption-aws-kms-key-id";
68    pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
69
70    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
71        "x-amz-copy-source-server-side-encryption-customer-algorithm";
72    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
73        "x-amz-copy-source-server-side-encryption-customer-key";
74    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
75        "x-amz-copy-source-server-side-encryption-customer-key-md5";
76
77    pub const X_AMZ_WRITE_OFFSET_BYTES: &str = "x-amz-write-offset-bytes";
78
79    pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-";
80
81    pub const X_AMZ_VERSION_ID: &str = "x-amz-version-id";
82    pub const X_AMZ_OBJECT_SIZE: &str = "x-amz-object-size";
83
84    pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";
85    pub const RESPONSE_CONTENT_TYPE: &str = "response-content-type";
86    pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
87
88    pub const S3_QUERY_VERSION_ID: &str = "versionId";
89}
90
91pub struct S3Core {
92    pub info: Arc<AccessorInfo>,
93
94    pub bucket: String,
95    pub endpoint: String,
96    pub root: String,
97    pub server_side_encryption: Option<HeaderValue>,
98    pub server_side_encryption_aws_kms_key_id: Option<HeaderValue>,
99    pub server_side_encryption_customer_algorithm: Option<HeaderValue>,
100    pub server_side_encryption_customer_key: Option<HeaderValue>,
101    pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
102    pub default_storage_class: Option<HeaderValue>,
103    pub allow_anonymous: bool,
104    pub disable_list_objects_v2: bool,
105    pub enable_request_payer: bool,
106
107    pub signer: AwsV4Signer,
108    pub loader: Box<dyn AwsCredentialLoad>,
109    pub credential_loaded: AtomicBool,
110    pub checksum_algorithm: Option<ChecksumAlgorithm>,
111}
112
113impl Debug for S3Core {
114    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
115        f.debug_struct("S3Core")
116            .field("bucket", &self.bucket)
117            .field("endpoint", &self.endpoint)
118            .field("root", &self.root)
119            .finish_non_exhaustive()
120    }
121}
122
123impl S3Core {
124    /// If credential is not found, we will not sign the request.
125    async fn load_credential(&self) -> Result<Option<AwsCredential>> {
126        let cred = self
127            .loader
128            .load_credential(GLOBAL_REQWEST_CLIENT.clone())
129            .await
130            .map_err(new_request_credential_error)?;
131
132        if let Some(cred) = cred {
133            // Update credential_loaded to true if we have load credential successfully.
134            self.credential_loaded
135                .store(true, atomic::Ordering::Relaxed);
136            return Ok(Some(cred));
137        }
138
139        // If we have load credential before but failed to load this time, we should
140        // return error instead.
141        if self.credential_loaded.load(atomic::Ordering::Relaxed) {
142            return Err(Error::new(
143                ErrorKind::PermissionDenied,
144                "credential was previously loaded successfully but has failed this time",
145            )
146            .set_temporary());
147        }
148
149        // Credential is empty and users allow anonymous access, we will not sign the request.
150        if self.allow_anonymous {
151            return Ok(None);
152        }
153
154        Err(Error::new(
155            ErrorKind::PermissionDenied,
156            "no valid credential found and anonymous access is not allowed",
157        ))
158    }
159
160    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
161        let cred = if let Some(cred) = self.load_credential().await? {
162            cred
163        } else {
164            return Ok(());
165        };
166
167        self.signer
168            .sign(req, &cred)
169            .map_err(new_request_sign_error)?;
170
171        // Always remove host header, let users' client to set it based on HTTP
172        // version.
173        //
174        // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
175        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
176        // contains host header.
177        req.headers_mut().remove(HOST);
178
179        Ok(())
180    }
181
182    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
183        let cred = if let Some(cred) = self.load_credential().await? {
184            cred
185        } else {
186            return Ok(());
187        };
188
189        self.signer
190            .sign_query(req, duration, &cred)
191            .map_err(new_request_sign_error)?;
192
193        // Always remove host header, let users' client to set it based on HTTP
194        // version.
195        //
196        // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
197        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
198        // contains host header.
199        req.headers_mut().remove(HOST);
200
201        Ok(())
202    }
203
204    #[inline]
205    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
206        self.info.http_client().send(req).await
207    }
208
209    /// # Note
210    ///
211    /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while
212    /// get or stat.
213    pub fn insert_sse_headers(
214        &self,
215        mut req: http::request::Builder,
216        is_write: bool,
217    ) -> http::request::Builder {
218        if is_write {
219            if let Some(v) = &self.server_side_encryption {
220                let mut v = v.clone();
221                v.set_sensitive(true);
222
223                req = req.header(
224                    HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION),
225                    v,
226                )
227            }
228            if let Some(v) = &self.server_side_encryption_aws_kms_key_id {
229                let mut v = v.clone();
230                v.set_sensitive(true);
231
232                req = req.header(
233                    HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID),
234                    v,
235                )
236            }
237        }
238
239        if let Some(v) = &self.server_side_encryption_customer_algorithm {
240            let mut v = v.clone();
241            v.set_sensitive(true);
242
243            req = req.header(
244                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM),
245                v,
246            )
247        }
248        if let Some(v) = &self.server_side_encryption_customer_key {
249            let mut v = v.clone();
250            v.set_sensitive(true);
251
252            req = req.header(
253                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY),
254                v,
255            )
256        }
257        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
258            let mut v = v.clone();
259            v.set_sensitive(true);
260
261            req = req.header(
262                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5),
263                v,
264            )
265        }
266
267        req
268    }
269    pub fn calculate_checksum(&self, body: &Buffer) -> Option<String> {
270        match self.checksum_algorithm {
271            None => None,
272            Some(ChecksumAlgorithm::Crc32c) => {
273                let mut crc = 0u32;
274                body.clone()
275                    .for_each(|b| crc = crc32c::crc32c_append(crc, &b));
276                Some(BASE64_STANDARD.encode(crc.to_be_bytes()))
277            }
278        }
279    }
280    pub fn insert_checksum_header(
281        &self,
282        mut req: http::request::Builder,
283        checksum: &str,
284    ) -> http::request::Builder {
285        if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
286            req = req.header(checksum_algorithm.to_header_name(), checksum);
287        }
288        req
289    }
290
291    pub fn insert_checksum_type_header(
292        &self,
293        mut req: http::request::Builder,
294    ) -> http::request::Builder {
295        if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
296            req = req.header("x-amz-checksum-algorithm", checksum_algorithm.to_string());
297        }
298        req
299    }
300
301    pub fn insert_metadata_headers(
302        &self,
303        mut req: http::request::Builder,
304        size: Option<u64>,
305        args: &OpWrite,
306    ) -> http::request::Builder {
307        if let Some(size) = size {
308            req = req.header(CONTENT_LENGTH, size.to_string())
309        }
310
311        if let Some(mime) = args.content_type() {
312            req = req.header(CONTENT_TYPE, mime)
313        }
314
315        if let Some(pos) = args.content_disposition() {
316            req = req.header(CONTENT_DISPOSITION, pos)
317        }
318
319        if let Some(encoding) = args.content_encoding() {
320            req = req.header(CONTENT_ENCODING, encoding);
321        }
322
323        if let Some(cache_control) = args.cache_control() {
324            req = req.header(CACHE_CONTROL, cache_control)
325        }
326
327        if let Some(if_match) = args.if_match() {
328            req = req.header(IF_MATCH, if_match);
329        }
330
331        if args.if_not_exists() {
332            req = req.header(IF_NONE_MATCH, "*");
333        }
334
335        // Set storage class header
336        if let Some(v) = &self.default_storage_class {
337            req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
338        }
339
340        // Set user metadata headers.
341        if let Some(user_metadata) = args.user_metadata() {
342            for (key, value) in user_metadata {
343                req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
344            }
345        }
346        req
347    }
348
349    pub fn insert_request_payer_header(
350        &self,
351        mut req: http::request::Builder,
352    ) -> http::request::Builder {
353        if self.enable_request_payer {
354            req = req.header(
355                HeaderName::from_static(constants::X_AMZ_SERVER_REQUEST_PAYER.0),
356                HeaderValue::from_static(constants::X_AMZ_SERVER_REQUEST_PAYER.1),
357            );
358        }
359        req
360    }
361}
362
363impl S3Core {
364    pub fn s3_head_object_request(&self, path: &str, args: OpStat) -> Result<Request<Buffer>> {
365        let p = build_abs_path(&self.root, path);
366
367        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
368
369        // Add query arguments to the URL based on response overrides
370        let mut query_args = Vec::new();
371        if let Some(override_content_disposition) = args.override_content_disposition() {
372            query_args.push(format!(
373                "{}={}",
374                constants::RESPONSE_CONTENT_DISPOSITION,
375                percent_encode_path(override_content_disposition)
376            ))
377        }
378        if let Some(override_content_type) = args.override_content_type() {
379            query_args.push(format!(
380                "{}={}",
381                constants::RESPONSE_CONTENT_TYPE,
382                percent_encode_path(override_content_type)
383            ))
384        }
385        if let Some(override_cache_control) = args.override_cache_control() {
386            query_args.push(format!(
387                "{}={}",
388                constants::RESPONSE_CACHE_CONTROL,
389                percent_encode_path(override_cache_control)
390            ))
391        }
392        if let Some(version) = args.version() {
393            query_args.push(format!(
394                "{}={}",
395                constants::S3_QUERY_VERSION_ID,
396                percent_decode_path(version)
397            ))
398        }
399        if !query_args.is_empty() {
400            url.push_str(&format!("?{}", query_args.join("&")));
401        }
402
403        let mut req = Request::head(&url);
404
405        req = self.insert_sse_headers(req, false);
406
407        if let Some(if_none_match) = args.if_none_match() {
408            req = req.header(IF_NONE_MATCH, if_none_match);
409        }
410        if let Some(if_match) = args.if_match() {
411            req = req.header(IF_MATCH, if_match);
412        }
413
414        if let Some(if_modified_since) = args.if_modified_since() {
415            req = req.header(
416                IF_MODIFIED_SINCE,
417                format_datetime_into_http_date(if_modified_since),
418            );
419        }
420        if let Some(if_unmodified_since) = args.if_unmodified_since() {
421            req = req.header(
422                IF_UNMODIFIED_SINCE,
423                format_datetime_into_http_date(if_unmodified_since),
424            );
425        }
426
427        // Set request payer header if enabled.
428        req = self.insert_request_payer_header(req);
429
430        // Inject operation to the request.
431        req = req.extension(Operation::Stat);
432
433        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
434
435        Ok(req)
436    }
437
438    pub fn s3_get_object_request(
439        &self,
440        path: &str,
441        range: BytesRange,
442        args: &OpRead,
443    ) -> Result<Request<Buffer>> {
444        let p = build_abs_path(&self.root, path);
445
446        // Construct headers to add to the request
447        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
448
449        // Add query arguments to the URL based on response overrides
450        let mut query_args = Vec::new();
451        if let Some(override_content_disposition) = args.override_content_disposition() {
452            query_args.push(format!(
453                "{}={}",
454                constants::RESPONSE_CONTENT_DISPOSITION,
455                percent_encode_path(override_content_disposition)
456            ))
457        }
458        if let Some(override_content_type) = args.override_content_type() {
459            query_args.push(format!(
460                "{}={}",
461                constants::RESPONSE_CONTENT_TYPE,
462                percent_encode_path(override_content_type)
463            ))
464        }
465        if let Some(override_cache_control) = args.override_cache_control() {
466            query_args.push(format!(
467                "{}={}",
468                constants::RESPONSE_CACHE_CONTROL,
469                percent_encode_path(override_cache_control)
470            ))
471        }
472        if let Some(version) = args.version() {
473            query_args.push(format!(
474                "{}={}",
475                constants::S3_QUERY_VERSION_ID,
476                percent_decode_path(version)
477            ))
478        }
479        if !query_args.is_empty() {
480            url.push_str(&format!("?{}", query_args.join("&")));
481        }
482
483        let mut req = Request::get(&url);
484
485        if !range.is_full() {
486            req = req.header(http::header::RANGE, range.to_header());
487        }
488
489        if let Some(if_none_match) = args.if_none_match() {
490            req = req.header(IF_NONE_MATCH, if_none_match);
491        }
492
493        if let Some(if_match) = args.if_match() {
494            req = req.header(IF_MATCH, if_match);
495        }
496
497        if let Some(if_modified_since) = args.if_modified_since() {
498            req = req.header(
499                IF_MODIFIED_SINCE,
500                format_datetime_into_http_date(if_modified_since),
501            );
502        }
503
504        if let Some(if_unmodified_since) = args.if_unmodified_since() {
505            req = req.header(
506                IF_UNMODIFIED_SINCE,
507                format_datetime_into_http_date(if_unmodified_since),
508            );
509        }
510
511        // Set request payer header if enabled.
512        req = self.insert_request_payer_header(req);
513
514        // Set SSE headers.
515        // TODO: how will this work with presign?
516        req = self.insert_sse_headers(req, false);
517
518        // Inject operation to the request.
519        req = req.extension(Operation::Read);
520
521        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
522
523        Ok(req)
524    }
525
526    pub async fn s3_get_object(
527        &self,
528        path: &str,
529        range: BytesRange,
530        args: &OpRead,
531    ) -> Result<Response<HttpBody>> {
532        let mut req = self.s3_get_object_request(path, range, args)?;
533
534        self.sign(&mut req).await?;
535
536        self.info.http_client().fetch(req).await
537    }
538
539    pub fn s3_put_object_request(
540        &self,
541        path: &str,
542        size: Option<u64>,
543        args: &OpWrite,
544        body: Buffer,
545    ) -> Result<Request<Buffer>> {
546        let p = build_abs_path(&self.root, path);
547
548        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
549
550        let mut req = Request::put(&url);
551
552        req = self.insert_metadata_headers(req, size, args);
553
554        // Set request payer header if enabled.
555        req = self.insert_request_payer_header(req);
556
557        // Set SSE headers.
558        req = self.insert_sse_headers(req, true);
559
560        // Calculate Checksum.
561        if let Some(checksum) = self.calculate_checksum(&body) {
562            // Set Checksum header.
563            req = self.insert_checksum_header(req, &checksum);
564        }
565
566        // Inject operation to the request.
567        req = req.extension(Operation::Write);
568
569        // Set body
570        let req = req.body(body).map_err(new_request_build_error)?;
571
572        Ok(req)
573    }
574
575    pub fn s3_append_object_request(
576        &self,
577        path: &str,
578        position: u64,
579        size: u64,
580        args: &OpWrite,
581        body: Buffer,
582    ) -> Result<Request<Buffer>> {
583        let p = build_abs_path(&self.root, path);
584        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
585        let mut req = Request::put(&url);
586
587        // Only include full metadata headers when creating a new object via append (position == 0)
588        // For existing objects or subsequent appends, only include content-length
589        if position == 0 {
590            req = self.insert_metadata_headers(req, Some(size), args);
591        } else {
592            req = req.header(CONTENT_LENGTH, size.to_string());
593        }
594
595        req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string());
596
597        // Set request payer header if enabled.
598        req = self.insert_request_payer_header(req);
599
600        // Set SSE headers.
601        req = self.insert_sse_headers(req, true);
602
603        // Inject operation to the request.
604        req = req.extension(Operation::Write);
605
606        // Set body
607        let req = req.body(body).map_err(new_request_build_error)?;
608
609        Ok(req)
610    }
611
612    pub async fn s3_head_object(&self, path: &str, args: OpStat) -> Result<Response<Buffer>> {
613        let mut req = self.s3_head_object_request(path, args)?;
614
615        self.sign(&mut req).await?;
616
617        self.send(req).await
618    }
619
620    pub async fn s3_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
621        let p = build_abs_path(&self.root, path);
622
623        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
624
625        let mut query_args = Vec::new();
626
627        if let Some(version) = args.version() {
628            query_args.push(format!(
629                "{}={}",
630                constants::S3_QUERY_VERSION_ID,
631                percent_encode_path(version)
632            ))
633        }
634
635        if !query_args.is_empty() {
636            url.push_str(&format!("?{}", query_args.join("&")));
637        }
638
639        let mut req = Request::delete(&url);
640
641        // Set request payer header if enabled.
642        req = self.insert_request_payer_header(req);
643
644        let mut req = req
645            // Inject operation to the request.
646            .extension(Operation::Delete)
647            .body(Buffer::new())
648            .map_err(new_request_build_error)?;
649
650        self.sign(&mut req).await?;
651
652        self.send(req).await
653    }
654
655    pub async fn s3_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
656        let from = build_abs_path(&self.root, from);
657        let to = build_abs_path(&self.root, to);
658
659        let source = format!("{}/{}", self.bucket, percent_encode_path(&from));
660        let target = format!("{}/{}", self.endpoint, percent_encode_path(&to));
661
662        let mut req = Request::put(&target);
663
664        // Set SSE headers.
665        req = self.insert_sse_headers(req, true);
666
667        if let Some(v) = &self.server_side_encryption_customer_algorithm {
668            let mut v = v.clone();
669            v.set_sensitive(true);
670
671            req = req.header(
672                HeaderName::from_static(
673                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
674                ),
675                v,
676            )
677        }
678
679        if let Some(v) = &self.server_side_encryption_customer_key {
680            let mut v = v.clone();
681            v.set_sensitive(true);
682
683            req = req.header(
684                HeaderName::from_static(
685                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
686                ),
687                v,
688            )
689        }
690
691        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
692            let mut v = v.clone();
693            v.set_sensitive(true);
694
695            req = req.header(
696                HeaderName::from_static(
697                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
698                ),
699                v,
700            )
701        }
702
703        // Set request payer header if enabled.
704        req = self.insert_request_payer_header(req);
705
706        let mut req = req
707            // Inject operation to the request.
708            .extension(Operation::Copy)
709            .header(constants::X_AMZ_COPY_SOURCE, &source)
710            .body(Buffer::new())
711            .map_err(new_request_build_error)?;
712
713        self.sign(&mut req).await?;
714
715        self.send(req).await
716    }
717
718    pub async fn s3_list_objects_v1(
719        &self,
720        path: &str,
721        marker: &str,
722        delimiter: &str,
723        limit: Option<usize>,
724    ) -> Result<Response<Buffer>> {
725        let p = build_abs_path(&self.root, path);
726
727        let mut url = QueryPairsWriter::new(&self.endpoint);
728
729        if !p.is_empty() {
730            url = url.push("prefix", &percent_encode_path(&p));
731        }
732        if !delimiter.is_empty() {
733            url = url.push("delimiter", delimiter);
734        }
735        if let Some(limit) = limit {
736            url = url.push("max-keys", &limit.to_string());
737        }
738        if !marker.is_empty() {
739            url = url.push("marker", &percent_encode_path(marker));
740        }
741
742        let mut req = Request::get(url.finish());
743
744        // Set request payer header if enabled.
745        req = self.insert_request_payer_header(req);
746
747        let mut req = req
748            // Inject operation to the request.
749            .extension(Operation::List)
750            .body(Buffer::new())
751            .map_err(new_request_build_error)?;
752
753        self.sign(&mut req).await?;
754
755        self.send(req).await
756    }
757
758    pub async fn s3_list_objects_v2(
759        &self,
760        path: &str,
761        continuation_token: &str,
762        delimiter: &str,
763        limit: Option<usize>,
764        start_after: Option<String>,
765    ) -> Result<Response<Buffer>> {
766        let p = build_abs_path(&self.root, path);
767
768        let mut url = QueryPairsWriter::new(&self.endpoint);
769        url = url.push("list-type", "2");
770
771        if !p.is_empty() {
772            url = url.push("prefix", &percent_encode_path(&p));
773        }
774        if !delimiter.is_empty() {
775            url = url.push("delimiter", delimiter);
776        }
777        if let Some(limit) = limit {
778            url = url.push("max-keys", &limit.to_string());
779        }
780        if let Some(start_after) = start_after {
781            url = url.push("start-after", &percent_encode_path(&start_after));
782        }
783        if !continuation_token.is_empty() {
784            // AWS S3 could return continuation-token that contains `=`
785            // which could lead `reqsign` parse query wrongly.
786            // URL encode continuation-token before starting signing so that
787            // our signer will not be confused.
788            url = url.push(
789                "continuation-token",
790                &percent_encode_path(continuation_token),
791            );
792        }
793
794        let mut req = Request::get(url.finish());
795
796        // Set request payer header if enabled.
797        req = self.insert_request_payer_header(req);
798
799        let mut req = req
800            // Inject operation to the request.
801            .extension(Operation::List)
802            .body(Buffer::new())
803            .map_err(new_request_build_error)?;
804
805        self.sign(&mut req).await?;
806
807        self.send(req).await
808    }
809
810    pub async fn s3_initiate_multipart_upload(
811        &self,
812        path: &str,
813        args: &OpWrite,
814    ) -> Result<Response<Buffer>> {
815        let p = build_abs_path(&self.root, path);
816
817        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
818
819        let mut req = Request::post(&url);
820
821        if let Some(mime) = args.content_type() {
822            req = req.header(CONTENT_TYPE, mime)
823        }
824
825        if let Some(content_disposition) = args.content_disposition() {
826            req = req.header(CONTENT_DISPOSITION, content_disposition)
827        }
828
829        if let Some(cache_control) = args.cache_control() {
830            req = req.header(CACHE_CONTROL, cache_control)
831        }
832
833        // Set storage class header
834        if let Some(v) = &self.default_storage_class {
835            req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
836        }
837
838        // Set user metadata headers.
839        if let Some(user_metadata) = args.user_metadata() {
840            for (key, value) in user_metadata {
841                req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
842            }
843        }
844
845        // Set request payer header if enabled.
846        req = self.insert_request_payer_header(req);
847
848        // Set SSE headers.
849        req = self.insert_sse_headers(req, true);
850
851        // Set SSE headers.
852        req = self.insert_checksum_type_header(req);
853
854        // Inject operation to the request.
855        req = req.extension(Operation::Write);
856
857        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
858
859        self.sign(&mut req).await?;
860
861        self.send(req).await
862    }
863
864    pub fn s3_upload_part_request(
865        &self,
866        path: &str,
867        upload_id: &str,
868        part_number: usize,
869        size: u64,
870        body: Buffer,
871        checksum: Option<String>,
872    ) -> Result<Request<Buffer>> {
873        let p = build_abs_path(&self.root, path);
874
875        let url = format!(
876            "{}/{}?partNumber={}&uploadId={}",
877            self.endpoint,
878            percent_encode_path(&p),
879            part_number,
880            percent_encode_path(upload_id)
881        );
882
883        let mut req = Request::put(&url);
884
885        req = req.header(CONTENT_LENGTH, size);
886
887        // Set request payer header if enabled.
888        req = self.insert_request_payer_header(req);
889
890        // Set SSE headers.
891        req = self.insert_sse_headers(req, true);
892
893        if let Some(checksum) = checksum {
894            // Set Checksum header.
895            req = self.insert_checksum_header(req, &checksum);
896        }
897
898        // Inject operation to the request.
899        req = req.extension(Operation::Write);
900
901        // Set body
902        let req = req.body(body).map_err(new_request_build_error)?;
903
904        Ok(req)
905    }
906
907    pub async fn s3_complete_multipart_upload(
908        &self,
909        path: &str,
910        upload_id: &str,
911        parts: Vec<CompleteMultipartUploadRequestPart>,
912    ) -> Result<Response<Buffer>> {
913        let p = build_abs_path(&self.root, path);
914
915        let url = format!(
916            "{}/{}?uploadId={}",
917            self.endpoint,
918            percent_encode_path(&p),
919            percent_encode_path(upload_id)
920        );
921
922        let mut req = Request::post(&url);
923
924        // Set SSE headers.
925        req = self.insert_sse_headers(req, true);
926
927        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
928            .map_err(new_xml_serialize_error)?;
929        // Make sure content length has been set to avoid post with chunked encoding.
930        req = req.header(CONTENT_LENGTH, content.len());
931        // Set content-type to `application/xml` to avoid mixed with form post.
932        req = req.header(CONTENT_TYPE, "application/xml");
933
934        // Set request payer header if enabled.
935        req = self.insert_request_payer_header(req);
936
937        // Inject operation to the request.
938        req = req.extension(Operation::Write);
939
940        let mut req = req
941            .body(Buffer::from(Bytes::from(content)))
942            .map_err(new_request_build_error)?;
943
944        self.sign(&mut req).await?;
945
946        self.send(req).await
947    }
948
949    /// Abort an on-going multipart upload.
950    pub async fn s3_abort_multipart_upload(
951        &self,
952        path: &str,
953        upload_id: &str,
954    ) -> Result<Response<Buffer>> {
955        let p = build_abs_path(&self.root, path);
956
957        let url = format!(
958            "{}/{}?uploadId={}",
959            self.endpoint,
960            percent_encode_path(&p),
961            percent_encode_path(upload_id)
962        );
963
964        let mut req = Request::delete(&url);
965
966        // Set request payer header if enabled.
967        req = self.insert_request_payer_header(req);
968
969        let mut req = req
970            // Inject operation to the request.
971            .extension(Operation::Write)
972            .body(Buffer::new())
973            .map_err(new_request_build_error)?;
974
975        self.sign(&mut req).await?;
976        self.send(req).await
977    }
978
979    pub async fn s3_delete_objects(
980        &self,
981        paths: Vec<(String, OpDelete)>,
982    ) -> Result<Response<Buffer>> {
983        let url = format!("{}/?delete", self.endpoint);
984
985        let mut req = Request::post(&url);
986
987        let content = quick_xml::se::to_string(&DeleteObjectsRequest {
988            object: paths
989                .into_iter()
990                .map(|(path, op)| DeleteObjectsRequestObject {
991                    key: build_abs_path(&self.root, &path),
992                    version_id: op.version().map(|v| v.to_owned()),
993                })
994                .collect(),
995        })
996        .map_err(new_xml_serialize_error)?;
997
998        // Make sure content length has been set to avoid post with chunked encoding.
999        req = req.header(CONTENT_LENGTH, content.len());
1000        // Set content-type to `application/xml` to avoid mixed with form post.
1001        req = req.header(CONTENT_TYPE, "application/xml");
1002        // Set content-md5 as required by API.
1003        req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes()));
1004
1005        // Set request payer header if enabled.
1006        req = self.insert_request_payer_header(req);
1007
1008        // Inject operation to the request.
1009        req = req.extension(Operation::Delete);
1010
1011        let mut req = req
1012            .body(Buffer::from(Bytes::from(content)))
1013            .map_err(new_request_build_error)?;
1014
1015        self.sign(&mut req).await?;
1016
1017        self.send(req).await
1018    }
1019
1020    pub async fn s3_list_object_versions(
1021        &self,
1022        prefix: &str,
1023        delimiter: &str,
1024        limit: Option<usize>,
1025        key_marker: &str,
1026        version_id_marker: &str,
1027    ) -> Result<Response<Buffer>> {
1028        let p = build_abs_path(&self.root, prefix);
1029
1030        let mut url = format!("{}?versions", self.endpoint);
1031        if !p.is_empty() {
1032            write!(url, "&prefix={}", percent_encode_path(p.as_str()))
1033                .expect("write into string must succeed");
1034        }
1035        if !delimiter.is_empty() {
1036            write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
1037        }
1038
1039        if let Some(limit) = limit {
1040            write!(url, "&max-keys={}", limit).expect("write into string must succeed");
1041        }
1042        if !key_marker.is_empty() {
1043            write!(url, "&key-marker={}", percent_encode_path(key_marker))
1044                .expect("write into string must succeed");
1045        }
1046        if !version_id_marker.is_empty() {
1047            write!(
1048                url,
1049                "&version-id-marker={}",
1050                percent_encode_path(version_id_marker)
1051            )
1052            .expect("write into string must succeed");
1053        }
1054
1055        let mut req = Request::get(&url);
1056
1057        // Set request payer header if enabled.
1058        req = self.insert_request_payer_header(req);
1059
1060        let mut req = req
1061            // Inject operation to the request.
1062            .extension(Operation::List)
1063            .body(Buffer::new())
1064            .map_err(new_request_build_error)?;
1065
1066        self.sign(&mut req).await?;
1067
1068        self.send(req).await
1069    }
1070}
1071
1072/// Result of CreateMultipartUpload
1073#[derive(Default, Debug, Deserialize)]
1074#[serde(default, rename_all = "PascalCase")]
1075pub struct InitiateMultipartUploadResult {
1076    pub upload_id: String,
1077}
1078
1079/// Request of CompleteMultipartUploadRequest
1080#[derive(Default, Debug, Serialize)]
1081#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
1082pub struct CompleteMultipartUploadRequest {
1083    pub part: Vec<CompleteMultipartUploadRequestPart>,
1084}
1085
1086#[derive(Clone, Default, Debug, Serialize)]
1087#[serde(default, rename_all = "PascalCase")]
1088pub struct CompleteMultipartUploadRequestPart {
1089    #[serde(rename = "PartNumber")]
1090    pub part_number: usize,
1091    /// # TODO
1092    ///
1093    /// quick-xml will do escape on `"` which leads to our serialized output is
1094    /// not the same as aws s3's example.
1095    ///
1096    /// Ideally, we could use `serialize_with` to address this (buf failed)
1097    ///
1098    /// ```ignore
1099    /// #[derive(Default, Debug, Serialize)]
1100    /// #[serde(default, rename_all = "PascalCase")]
1101    /// struct CompleteMultipartUploadRequestPart {
1102    ///     #[serde(rename = "PartNumber")]
1103    ///     part_number: usize,
1104    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
1105    ///     etag: String,
1106    /// }
1107    ///
1108    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
1109    /// where
1110    ///     S: serde::Serializer,
1111    /// {
1112    ///     ser.serialize_str(&String::from_utf8_lossy(
1113    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
1114    ///     ))
1115    /// }
1116    /// ```
1117    ///
1118    /// ref: <https://github.com/tafia/quick-xml/issues/362>
1119    #[serde(rename = "ETag")]
1120    pub etag: String,
1121    #[serde(rename = "ChecksumCRC32C", skip_serializing_if = "Option::is_none")]
1122    pub checksum_crc32c: Option<String>,
1123}
1124
1125/// Output of `CompleteMultipartUpload` operation
1126#[derive(Debug, Default, Deserialize)]
1127#[serde[default, rename_all = "PascalCase"]]
1128pub struct CompleteMultipartUploadResult {
1129    pub bucket: String,
1130    pub key: String,
1131    pub location: String,
1132    #[serde(rename = "ETag")]
1133    pub etag: String,
1134    pub code: String,
1135    pub message: String,
1136    pub request_id: String,
1137}
1138
1139/// Request of DeleteObjects.
1140#[derive(Default, Debug, Serialize)]
1141#[serde(default, rename = "Delete", rename_all = "PascalCase")]
1142pub struct DeleteObjectsRequest {
1143    pub object: Vec<DeleteObjectsRequestObject>,
1144}
1145
1146#[derive(Default, Debug, Serialize)]
1147#[serde(rename_all = "PascalCase")]
1148pub struct DeleteObjectsRequestObject {
1149    pub key: String,
1150    #[serde(skip_serializing_if = "Option::is_none")]
1151    pub version_id: Option<String>,
1152}
1153
1154/// Result of DeleteObjects.
1155#[derive(Default, Debug, Deserialize)]
1156#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")]
1157pub struct DeleteObjectsResult {
1158    pub deleted: Vec<DeleteObjectsResultDeleted>,
1159    pub error: Vec<DeleteObjectsResultError>,
1160}
1161
1162#[derive(Default, Debug, Deserialize)]
1163#[serde(rename_all = "PascalCase")]
1164pub struct DeleteObjectsResultDeleted {
1165    pub key: String,
1166    pub version_id: Option<String>,
1167}
1168
1169#[derive(Default, Debug, Deserialize)]
1170#[serde(default, rename_all = "PascalCase")]
1171pub struct DeleteObjectsResultError {
1172    pub code: String,
1173    pub key: String,
1174    pub message: String,
1175    pub version_id: Option<String>,
1176}
1177
1178/// Output of ListBucket/ListObjects (a.k.a ListObjectsV1).
1179#[derive(Default, Debug, Deserialize)]
1180#[serde(default, rename_all = "PascalCase")]
1181pub struct ListObjectsOutputV1 {
1182    pub is_truncated: Option<bool>,
1183    /// ## Notes
1184    ///
1185    /// `next_marker` is returned only if we have the delimiter request parameter
1186    /// specified. If the response does not include the NextMarker element and it
1187    /// is truncated, we should use the value of the last Key element in the
1188    /// response as the marker parameter in the subsequent request to get the
1189    /// next set of object keys.
1190    ///
1191    /// If the contents is empty, we should find common_prefixes instead.
1192    pub next_marker: Option<String>,
1193    pub common_prefixes: Vec<OutputCommonPrefix>,
1194    pub contents: Vec<ListObjectsOutputContent>,
1195}
1196
1197/// Output of ListBucketV2/ListObjectsV2.
1198///
1199/// ## Note
1200///
1201/// Use `Option` in `is_truncated` and `next_continuation_token` to make
1202/// the behavior more clear so that we can be compatible to more s3 services.
1203///
1204/// And enable `serde(default)` so that we can keep going even when some field
1205/// is not exist.
1206#[derive(Default, Debug, Deserialize)]
1207#[serde(default, rename_all = "PascalCase")]
1208pub struct ListObjectsOutputV2 {
1209    pub is_truncated: Option<bool>,
1210    pub next_continuation_token: Option<String>,
1211    pub common_prefixes: Vec<OutputCommonPrefix>,
1212    pub contents: Vec<ListObjectsOutputContent>,
1213}
1214
1215#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1216#[serde(rename_all = "PascalCase")]
1217pub struct ListObjectsOutputContent {
1218    pub key: String,
1219    pub size: u64,
1220    pub last_modified: String,
1221    #[serde(rename = "ETag")]
1222    pub etag: Option<String>,
1223}
1224
1225#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1226#[serde(rename_all = "PascalCase")]
1227pub struct OutputCommonPrefix {
1228    pub prefix: String,
1229}
1230
1231/// Output of ListObjectVersions
1232#[derive(Default, Debug, Deserialize)]
1233#[serde(default, rename_all = "PascalCase")]
1234pub struct ListObjectVersionsOutput {
1235    pub is_truncated: Option<bool>,
1236    pub next_key_marker: Option<String>,
1237    pub next_version_id_marker: Option<String>,
1238    pub common_prefixes: Vec<OutputCommonPrefix>,
1239    pub version: Vec<ListObjectVersionsOutputVersion>,
1240    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
1241}
1242
1243#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1244#[serde(rename_all = "PascalCase")]
1245pub struct ListObjectVersionsOutputVersion {
1246    pub key: String,
1247    pub version_id: String,
1248    pub is_latest: bool,
1249    pub size: u64,
1250    pub last_modified: String,
1251    #[serde(rename = "ETag")]
1252    pub etag: Option<String>,
1253}
1254
1255#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1256#[serde(rename_all = "PascalCase")]
1257pub struct ListObjectVersionsOutputDeleteMarker {
1258    pub key: String,
1259    pub version_id: String,
1260    pub is_latest: bool,
1261    pub last_modified: String,
1262}
1263
1264pub enum ChecksumAlgorithm {
1265    Crc32c,
1266}
1267impl ChecksumAlgorithm {
1268    pub fn to_header_name(&self) -> HeaderName {
1269        match self {
1270            Self::Crc32c => HeaderName::from_static("x-amz-checksum-crc32c"),
1271        }
1272    }
1273}
1274impl Display for ChecksumAlgorithm {
1275    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1276        write!(
1277            f,
1278            "{}",
1279            match self {
1280                Self::Crc32c => "CRC32C",
1281            }
1282        )
1283    }
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288    use bytes::Buf;
1289    use bytes::Bytes;
1290
1291    use super::*;
1292
1293    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples
1294    #[test]
1295    fn test_deserialize_initiate_multipart_upload_result() {
1296        let bs = Bytes::from(
1297            r#"<?xml version="1.0" encoding="UTF-8"?>
1298            <InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1299              <Bucket>example-bucket</Bucket>
1300              <Key>example-object</Key>
1301              <UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
1302            </InitiateMultipartUploadResult>"#,
1303        );
1304
1305        let out: InitiateMultipartUploadResult =
1306            quick_xml::de::from_reader(bs.reader()).expect("must success");
1307
1308        assert_eq!(
1309            out.upload_id,
1310            "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"
1311        )
1312    }
1313
1314    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples
1315    #[test]
1316    fn test_serialize_complete_multipart_upload_request() {
1317        let req = CompleteMultipartUploadRequest {
1318            part: vec![
1319                CompleteMultipartUploadRequestPart {
1320                    part_number: 1,
1321                    etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(),
1322                    ..Default::default()
1323                },
1324                CompleteMultipartUploadRequestPart {
1325                    part_number: 2,
1326                    etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(),
1327                    ..Default::default()
1328                },
1329                CompleteMultipartUploadRequestPart {
1330                    part_number: 3,
1331                    etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(),
1332                    ..Default::default()
1333                },
1334            ],
1335        };
1336
1337        let actual = quick_xml::se::to_string(&req).expect("must succeed");
1338
1339        pretty_assertions::assert_eq!(
1340            actual,
1341            r#"<CompleteMultipartUpload>
1342             <Part>
1343                <PartNumber>1</PartNumber>
1344               <ETag>"a54357aff0632cce46d942af68356b38"</ETag>
1345             </Part>
1346             <Part>
1347                <PartNumber>2</PartNumber>
1348               <ETag>"0c78aef83f66abc1fa1e8477f296d394"</ETag>
1349             </Part>
1350             <Part>
1351               <PartNumber>3</PartNumber>
1352               <ETag>"acbd18db4cc2f85cedef654fccc4a4d8"</ETag>
1353             </Part>
1354            </CompleteMultipartUpload>"#
1355                // Cleanup space and new line
1356                .replace([' ', '\n'], "")
1357        )
1358    }
1359
1360    /// this example is from: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
1361    #[test]
1362    fn test_deserialize_complete_multipart_upload_result() {
1363        let bs = Bytes::from(
1364            r#"<?xml version="1.0" encoding="UTF-8"?>
1365            <CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1366             <Location>http://Example-Bucket.s3.region.amazonaws.com/Example-Object</Location>
1367             <Bucket>Example-Bucket</Bucket>
1368             <Key>Example-Object</Key>
1369             <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
1370            </CompleteMultipartUploadResult>"#,
1371        );
1372
1373        let out: CompleteMultipartUploadResult =
1374            quick_xml::de::from_reader(bs.reader()).expect("must success");
1375
1376        assert_eq!(out.bucket, "Example-Bucket");
1377        assert_eq!(out.key, "Example-Object");
1378        assert_eq!(
1379            out.location,
1380            "http://Example-Bucket.s3.region.amazonaws.com/Example-Object"
1381        );
1382        assert_eq!(out.etag, "\"3858f62230ac3c915f300c664312c11f-9\"");
1383    }
1384
1385    #[test]
1386    fn test_deserialize_complete_multipart_upload_result_when_return_error() {
1387        let bs = Bytes::from(
1388            r#"<?xml version="1.0" encoding="UTF-8"?>
1389
1390                <Error>
1391                <Code>InternalError</Code>
1392                <Message>We encountered an internal error. Please try again.</Message>
1393                <RequestId>656c76696e6727732072657175657374</RequestId>
1394                <HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
1395                </Error>"#,
1396        );
1397
1398        let out: CompleteMultipartUploadResult =
1399            quick_xml::de::from_reader(bs.reader()).expect("must success");
1400
1401        assert_eq!(out.code, "InternalError");
1402        assert_eq!(
1403            out.message,
1404            "We encountered an internal error. Please try again."
1405        );
1406        assert_eq!(out.request_id, "656c76696e6727732072657175657374");
1407    }
1408
1409    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
1410    #[test]
1411    fn test_serialize_delete_objects_request() {
1412        let req = DeleteObjectsRequest {
1413            object: vec![
1414                DeleteObjectsRequestObject {
1415                    key: "sample1.txt".to_string(),
1416                    version_id: None,
1417                },
1418                DeleteObjectsRequestObject {
1419                    key: "sample2.txt".to_string(),
1420                    version_id: Some("11111".to_owned()),
1421                },
1422            ],
1423        };
1424
1425        let actual = quick_xml::se::to_string(&req).expect("must succeed");
1426
1427        pretty_assertions::assert_eq!(
1428            actual,
1429            r#"<Delete>
1430             <Object>
1431             <Key>sample1.txt</Key>
1432             </Object>
1433             <Object>
1434               <Key>sample2.txt</Key>
1435               <VersionId>11111</VersionId>
1436             </Object>
1437             </Delete>"#
1438                // Cleanup space and new line
1439                .replace([' ', '\n'], "")
1440        )
1441    }
1442
1443    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
1444    #[test]
1445    fn test_deserialize_delete_objects_result() {
1446        let bs = Bytes::from(
1447            r#"<?xml version="1.0" encoding="UTF-8"?>
1448            <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1449             <Deleted>
1450               <Key>sample1.txt</Key>
1451             </Deleted>
1452             <Error>
1453              <Key>sample2.txt</Key>
1454              <Code>AccessDenied</Code>
1455              <Message>Access Denied</Message>
1456             </Error>
1457            </DeleteResult>"#,
1458        );
1459
1460        let out: DeleteObjectsResult =
1461            quick_xml::de::from_reader(bs.reader()).expect("must success");
1462
1463        assert_eq!(out.deleted.len(), 1);
1464        assert_eq!(out.deleted[0].key, "sample1.txt");
1465        assert_eq!(out.error.len(), 1);
1466        assert_eq!(out.error[0].key, "sample2.txt");
1467        assert_eq!(out.error[0].code, "AccessDenied");
1468        assert_eq!(out.error[0].message, "Access Denied");
1469    }
1470
1471    #[test]
1472    fn test_deserialize_delete_objects_with_version_id() {
1473        let bs = Bytes::from(
1474            r#"<?xml version="1.0" encoding="UTF-8"?>
1475                  <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1476                    <Deleted>
1477                      <Key>SampleDocument.txt</Key>
1478                      <VersionId>OYcLXagmS.WaD..oyH4KRguB95_YhLs7</VersionId>
1479                    </Deleted>
1480                  </DeleteResult>"#,
1481        );
1482
1483        let out: DeleteObjectsResult =
1484            quick_xml::de::from_reader(bs.reader()).expect("must success");
1485
1486        assert_eq!(out.deleted.len(), 1);
1487        assert_eq!(out.deleted[0].key, "SampleDocument.txt");
1488        assert_eq!(
1489            out.deleted[0].version_id,
1490            Some("OYcLXagmS.WaD..oyH4KRguB95_YhLs7".to_owned())
1491        );
1492        assert_eq!(out.error.len(), 0);
1493    }
1494
1495    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#API_ListObjects_Examples
1496    #[test]
1497    fn test_parse_list_output_v1() {
1498        let bs = bytes::Bytes::from(
1499            r#"<?xml version="1.0" encoding="UTF-8"?>
1500            <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1501                <Name>bucket</Name>
1502                <Prefix/>
1503                <Marker/>
1504                <MaxKeys>1000</MaxKeys>
1505                <IsTruncated>false</IsTruncated>
1506                <Contents>
1507                    <Key>my-image.jpg</Key>
1508                    <LastModified>2009-10-12T17:50:30.000Z</LastModified>
1509                    <ETag>"fba9dede5f27731c9771645a39863328"</ETag>
1510                    <Size>434234</Size>
1511                    <StorageClass>STANDARD</StorageClass>
1512                    <Owner>
1513                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1514                        <DisplayName>mtd@amazon.com</DisplayName>
1515                    </Owner>
1516                </Contents>
1517                <Contents>
1518                   <Key>my-third-image.jpg</Key>
1519                     <LastModified>2009-10-12T17:50:30.000Z</LastModified>
1520                     <ETag>"1b2cf535f27731c974343645a3985328"</ETag>
1521                     <Size>64994</Size>
1522                     <StorageClass>STANDARD_IA</StorageClass>
1523                     <Owner>
1524                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1525                        <DisplayName>mtd@amazon.com</DisplayName>
1526                    </Owner>
1527                </Contents>
1528            </ListBucketResult>"#,
1529        );
1530
1531        let out: ListObjectsOutputV1 =
1532            quick_xml::de::from_reader(bs.reader()).expect("must success");
1533
1534        assert!(!out.is_truncated.unwrap());
1535        assert!(out.next_marker.is_none());
1536        assert!(out.common_prefixes.is_empty());
1537        assert_eq!(
1538            out.contents,
1539            vec![
1540                ListObjectsOutputContent {
1541                    key: "my-image.jpg".to_string(),
1542                    size: 434234,
1543                    etag: Some("\"fba9dede5f27731c9771645a39863328\"".to_string()),
1544                    last_modified: "2009-10-12T17:50:30.000Z".to_string(),
1545                },
1546                ListObjectsOutputContent {
1547                    key: "my-third-image.jpg".to_string(),
1548                    size: 64994,
1549                    last_modified: "2009-10-12T17:50:30.000Z".to_string(),
1550                    etag: Some("\"1b2cf535f27731c974343645a3985328\"".to_string()),
1551                },
1552            ]
1553        )
1554    }
1555
1556    #[test]
1557    fn test_parse_list_output_v2() {
1558        let bs = bytes::Bytes::from(
1559            r#"<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1560  <Name>example-bucket</Name>
1561  <Prefix>photos/2006/</Prefix>
1562  <KeyCount>3</KeyCount>
1563  <MaxKeys>1000</MaxKeys>
1564  <Delimiter>/</Delimiter>
1565  <IsTruncated>false</IsTruncated>
1566  <Contents>
1567    <Key>photos/2006</Key>
1568    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
1569    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
1570    <Size>56</Size>
1571    <StorageClass>STANDARD</StorageClass>
1572  </Contents>
1573  <Contents>
1574    <Key>photos/2007</Key>
1575    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
1576    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
1577    <Size>100</Size>
1578    <StorageClass>STANDARD</StorageClass>
1579  </Contents>
1580  <Contents>
1581    <Key>photos/2008</Key>
1582    <LastModified>2016-05-30T23:51:29.000Z</LastModified>
1583    <Size>42</Size>
1584  </Contents>
1585
1586  <CommonPrefixes>
1587    <Prefix>photos/2006/February/</Prefix>
1588  </CommonPrefixes>
1589  <CommonPrefixes>
1590    <Prefix>photos/2006/January/</Prefix>
1591  </CommonPrefixes>
1592</ListBucketResult>"#,
1593        );
1594
1595        let out: ListObjectsOutputV2 =
1596            quick_xml::de::from_reader(bs.reader()).expect("must success");
1597
1598        assert!(!out.is_truncated.unwrap());
1599        assert!(out.next_continuation_token.is_none());
1600        assert_eq!(
1601            out.common_prefixes
1602                .iter()
1603                .map(|v| v.prefix.clone())
1604                .collect::<Vec<String>>(),
1605            vec!["photos/2006/February/", "photos/2006/January/"]
1606        );
1607        assert_eq!(
1608            out.contents,
1609            vec![
1610                ListObjectsOutputContent {
1611                    key: "photos/2006".to_string(),
1612                    size: 56,
1613                    etag: Some("\"d41d8cd98f00b204e9800998ecf8427e\"".to_string()),
1614                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
1615                },
1616                ListObjectsOutputContent {
1617                    key: "photos/2007".to_string(),
1618                    size: 100,
1619                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
1620                    etag: Some("\"d41d8cd98f00b204e9800998ecf8427e\"".to_string()),
1621                },
1622                ListObjectsOutputContent {
1623                    key: "photos/2008".to_string(),
1624                    size: 42,
1625                    last_modified: "2016-05-30T23:51:29.000Z".to_string(),
1626                    etag: None,
1627                },
1628            ]
1629        )
1630    }
1631
1632    #[test]
1633    fn test_parse_list_object_versions() {
1634        let bs = bytes::Bytes::from(
1635            r#"<?xml version="1.0" encoding="UTF-8"?>
1636                <ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1637                <Name>mtp-versioning-fresh</Name>
1638                <Prefix/>
1639                <KeyMarker>key3</KeyMarker>
1640                <VersionIdMarker>null</VersionIdMarker>
1641                <NextKeyMarker>key3</NextKeyMarker>
1642                <NextVersionIdMarker>d-d309mfjFrUmoQ0DBsVqmcMV15OI.</NextVersionIdMarker>
1643                <MaxKeys>3</MaxKeys>
1644                <IsTruncated>true</IsTruncated>
1645                <Version>
1646                    <Key>key3</Key>
1647                    <VersionId>8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.</VersionId>
1648                    <IsLatest>true</IsLatest>
1649                    <LastModified>2009-12-09T00:18:23.000Z</LastModified>
1650                    <ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
1651                    <Size>217</Size>
1652                    <Owner>
1653                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1654                    </Owner>
1655                    <StorageClass>STANDARD</StorageClass>
1656                </Version>
1657                <Version>
1658                    <Key>key3</Key>
1659                    <VersionId>d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.</VersionId>
1660                    <IsLatest>false</IsLatest>
1661                    <LastModified>2009-12-09T00:18:08.000Z</LastModified>
1662                    <ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
1663                    <Size>217</Size>
1664                    <Owner>
1665                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1666                    </Owner>
1667                    <StorageClass>STANDARD</StorageClass>
1668                </Version>
1669                <CommonPrefixes>
1670                    <Prefix>photos/</Prefix>
1671                </CommonPrefixes>
1672                <CommonPrefixes>
1673                    <Prefix>videos/</Prefix>
1674                </CommonPrefixes>
1675                 <DeleteMarker>
1676                    <Key>my-third-image.jpg</Key>
1677                    <VersionId>03jpff543dhffds434rfdsFDN943fdsFkdmqnh892</VersionId>
1678                    <IsLatest>true</IsLatest>
1679                    <LastModified>2009-10-15T17:50:30.000Z</LastModified>
1680                    <Owner>
1681                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1682                        <DisplayName>mtd@amazon.com</DisplayName>
1683                    </Owner>
1684                </DeleteMarker>
1685                </ListVersionsResult>"#,
1686        );
1687
1688        let output: ListObjectVersionsOutput =
1689            quick_xml::de::from_reader(bs.reader()).expect("must succeed");
1690
1691        assert!(output.is_truncated.unwrap());
1692        assert_eq!(output.next_key_marker, Some("key3".to_owned()));
1693        assert_eq!(
1694            output.next_version_id_marker,
1695            Some("d-d309mfjFrUmoQ0DBsVqmcMV15OI.".to_owned())
1696        );
1697        assert_eq!(
1698            output.common_prefixes,
1699            vec![
1700                OutputCommonPrefix {
1701                    prefix: "photos/".to_owned()
1702                },
1703                OutputCommonPrefix {
1704                    prefix: "videos/".to_owned()
1705                }
1706            ]
1707        );
1708
1709        assert_eq!(
1710            output.version,
1711            vec![
1712                ListObjectVersionsOutputVersion {
1713                    key: "key3".to_owned(),
1714                    version_id: "8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.".to_owned(),
1715                    is_latest: true,
1716                    size: 217,
1717                    last_modified: "2009-12-09T00:18:23.000Z".to_owned(),
1718                    etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
1719                },
1720                ListObjectVersionsOutputVersion {
1721                    key: "key3".to_owned(),
1722                    version_id: "d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.".to_owned(),
1723                    is_latest: false,
1724                    size: 217,
1725                    last_modified: "2009-12-09T00:18:08.000Z".to_owned(),
1726                    etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
1727                }
1728            ]
1729        );
1730
1731        assert_eq!(
1732            output.delete_marker,
1733            vec![ListObjectVersionsOutputDeleteMarker {
1734                key: "my-third-image.jpg".to_owned(),
1735                version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(),
1736                is_latest: true,
1737                last_modified: "2009-10-15T17:50:30.000Z".to_owned(),
1738            },]
1739        );
1740    }
1741}