opendal/services/s3/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::fmt::Write;
23use std::sync::atomic::AtomicBool;
24use std::sync::{atomic, Arc};
25use std::time::Duration;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use bytes::Bytes;
30use constants::X_AMZ_META_PREFIX;
31use http::header::CACHE_CONTROL;
32use http::header::CONTENT_DISPOSITION;
33use http::header::CONTENT_ENCODING;
34use http::header::CONTENT_LENGTH;
35use http::header::CONTENT_TYPE;
36use http::header::HOST;
37use http::header::IF_MATCH;
38use http::header::IF_NONE_MATCH;
39use http::header::{HeaderName, IF_MODIFIED_SINCE, IF_UNMODIFIED_SINCE};
40use http::HeaderValue;
41use http::Request;
42use http::Response;
43use reqsign::AwsCredential;
44use reqsign::AwsCredentialLoad;
45use reqsign::AwsV4Signer;
46use serde::Deserialize;
47use serde::Serialize;
48
49use crate::raw::*;
50use crate::*;
51
52pub mod constants {
53    pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
54
55    pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption";
56    pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester");
57    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
58        "x-amz-server-side-encryption-customer-algorithm";
59    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
60        "x-amz-server-side-encryption-customer-key";
61    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
62        "x-amz-server-side-encryption-customer-key-md5";
63    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str =
64        "x-amz-server-side-encryption-aws-kms-key-id";
65    pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
66
67    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
68        "x-amz-copy-source-server-side-encryption-customer-algorithm";
69    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
70        "x-amz-copy-source-server-side-encryption-customer-key";
71    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
72        "x-amz-copy-source-server-side-encryption-customer-key-md5";
73
74    pub const X_AMZ_WRITE_OFFSET_BYTES: &str = "x-amz-write-offset-bytes";
75
76    pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-";
77
78    pub const X_AMZ_VERSION_ID: &str = "x-amz-version-id";
79    pub const X_AMZ_OBJECT_SIZE: &str = "x-amz-object-size";
80
81    pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";
82    pub const RESPONSE_CONTENT_TYPE: &str = "response-content-type";
83    pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
84
85    pub const S3_QUERY_VERSION_ID: &str = "versionId";
86}
87
88pub struct S3Core {
89    pub info: Arc<AccessorInfo>,
90
91    pub bucket: String,
92    pub endpoint: String,
93    pub root: String,
94    pub server_side_encryption: Option<HeaderValue>,
95    pub server_side_encryption_aws_kms_key_id: Option<HeaderValue>,
96    pub server_side_encryption_customer_algorithm: Option<HeaderValue>,
97    pub server_side_encryption_customer_key: Option<HeaderValue>,
98    pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
99    pub default_storage_class: Option<HeaderValue>,
100    pub allow_anonymous: bool,
101    pub disable_list_objects_v2: bool,
102    pub enable_request_payer: bool,
103
104    pub signer: AwsV4Signer,
105    pub loader: Box<dyn AwsCredentialLoad>,
106    pub credential_loaded: AtomicBool,
107    pub checksum_algorithm: Option<ChecksumAlgorithm>,
108}
109
110impl Debug for S3Core {
111    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112        f.debug_struct("S3Core")
113            .field("bucket", &self.bucket)
114            .field("endpoint", &self.endpoint)
115            .field("root", &self.root)
116            .finish_non_exhaustive()
117    }
118}
119
120impl S3Core {
121    /// If credential is not found, we will not sign the request.
122    async fn load_credential(&self) -> Result<Option<AwsCredential>> {
123        let cred = self
124            .loader
125            .load_credential(GLOBAL_REQWEST_CLIENT.clone())
126            .await
127            .map_err(new_request_credential_error)?;
128
129        if let Some(cred) = cred {
130            // Update credential_loaded to true if we have load credential successfully.
131            self.credential_loaded
132                .store(true, atomic::Ordering::Relaxed);
133            return Ok(Some(cred));
134        }
135
136        // If we have load credential before but failed to load this time, we should
137        // return error instead.
138        if self.credential_loaded.load(atomic::Ordering::Relaxed) {
139            return Err(Error::new(
140                ErrorKind::PermissionDenied,
141                "credential was previously loaded successfully but has failed this time",
142            )
143            .set_temporary());
144        }
145
146        // Credential is empty and users allow anonymous access, we will not sign the request.
147        if self.allow_anonymous {
148            return Ok(None);
149        }
150
151        Err(Error::new(
152            ErrorKind::PermissionDenied,
153            "no valid credential found and anonymous access is not allowed",
154        ))
155    }
156
157    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
158        let cred = if let Some(cred) = self.load_credential().await? {
159            cred
160        } else {
161            return Ok(());
162        };
163
164        self.signer
165            .sign(req, &cred)
166            .map_err(new_request_sign_error)?;
167
168        // Always remove host header, let users' client to set it based on HTTP
169        // version.
170        //
171        // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
172        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
173        // contains host header.
174        req.headers_mut().remove(HOST);
175
176        Ok(())
177    }
178
179    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
180        let cred = if let Some(cred) = self.load_credential().await? {
181            cred
182        } else {
183            return Ok(());
184        };
185
186        self.signer
187            .sign_query(req, duration, &cred)
188            .map_err(new_request_sign_error)?;
189
190        // Always remove host header, let users' client to set it based on HTTP
191        // version.
192        //
193        // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
194        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
195        // contains host header.
196        req.headers_mut().remove(HOST);
197
198        Ok(())
199    }
200
201    #[inline]
202    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
203        self.info.http_client().send(req).await
204    }
205
206    /// # Note
207    ///
208    /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while
209    /// get or stat.
210    pub fn insert_sse_headers(
211        &self,
212        mut req: http::request::Builder,
213        is_write: bool,
214    ) -> http::request::Builder {
215        if is_write {
216            if let Some(v) = &self.server_side_encryption {
217                let mut v = v.clone();
218                v.set_sensitive(true);
219
220                req = req.header(
221                    HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION),
222                    v,
223                )
224            }
225            if let Some(v) = &self.server_side_encryption_aws_kms_key_id {
226                let mut v = v.clone();
227                v.set_sensitive(true);
228
229                req = req.header(
230                    HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID),
231                    v,
232                )
233            }
234        }
235
236        if let Some(v) = &self.server_side_encryption_customer_algorithm {
237            let mut v = v.clone();
238            v.set_sensitive(true);
239
240            req = req.header(
241                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM),
242                v,
243            )
244        }
245        if let Some(v) = &self.server_side_encryption_customer_key {
246            let mut v = v.clone();
247            v.set_sensitive(true);
248
249            req = req.header(
250                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY),
251                v,
252            )
253        }
254        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
255            let mut v = v.clone();
256            v.set_sensitive(true);
257
258            req = req.header(
259                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5),
260                v,
261            )
262        }
263
264        req
265    }
266    pub fn calculate_checksum(&self, body: &Buffer) -> Option<String> {
267        match self.checksum_algorithm {
268            None => None,
269            Some(ChecksumAlgorithm::Crc32c) => {
270                let mut crc = 0u32;
271                body.clone()
272                    .for_each(|b| crc = crc32c::crc32c_append(crc, &b));
273                Some(BASE64_STANDARD.encode(crc.to_be_bytes()))
274            }
275        }
276    }
277    pub fn insert_checksum_header(
278        &self,
279        mut req: http::request::Builder,
280        checksum: &str,
281    ) -> http::request::Builder {
282        if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
283            req = req.header(checksum_algorithm.to_header_name(), checksum);
284        }
285        req
286    }
287
288    pub fn insert_checksum_type_header(
289        &self,
290        mut req: http::request::Builder,
291    ) -> http::request::Builder {
292        if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
293            req = req.header("x-amz-checksum-algorithm", checksum_algorithm.to_string());
294        }
295        req
296    }
297
298    pub fn insert_metadata_headers(
299        &self,
300        mut req: http::request::Builder,
301        size: Option<u64>,
302        args: &OpWrite,
303    ) -> http::request::Builder {
304        if let Some(size) = size {
305            req = req.header(CONTENT_LENGTH, size.to_string())
306        }
307
308        if let Some(mime) = args.content_type() {
309            req = req.header(CONTENT_TYPE, mime)
310        }
311
312        if let Some(pos) = args.content_disposition() {
313            req = req.header(CONTENT_DISPOSITION, pos)
314        }
315
316        if let Some(encoding) = args.content_encoding() {
317            req = req.header(CONTENT_ENCODING, encoding);
318        }
319
320        if let Some(cache_control) = args.cache_control() {
321            req = req.header(CACHE_CONTROL, cache_control)
322        }
323
324        if let Some(if_match) = args.if_match() {
325            req = req.header(IF_MATCH, if_match);
326        }
327
328        if args.if_not_exists() {
329            req = req.header(IF_NONE_MATCH, "*");
330        }
331
332        // Set storage class header
333        if let Some(v) = &self.default_storage_class {
334            req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
335        }
336
337        // Set user metadata headers.
338        if let Some(user_metadata) = args.user_metadata() {
339            for (key, value) in user_metadata {
340                req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
341            }
342        }
343        req
344    }
345
346    pub fn insert_request_payer_header(
347        &self,
348        mut req: http::request::Builder,
349    ) -> http::request::Builder {
350        if self.enable_request_payer {
351            req = req.header(
352                HeaderName::from_static(constants::X_AMZ_SERVER_REQUEST_PAYER.0),
353                HeaderValue::from_static(constants::X_AMZ_SERVER_REQUEST_PAYER.1),
354            );
355        }
356        req
357    }
358}
359
360impl S3Core {
361    pub fn s3_head_object_request(&self, path: &str, args: OpStat) -> Result<Request<Buffer>> {
362        let p = build_abs_path(&self.root, path);
363
364        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
365
366        // Add query arguments to the URL based on response overrides
367        let mut query_args = Vec::new();
368        if let Some(override_content_disposition) = args.override_content_disposition() {
369            query_args.push(format!(
370                "{}={}",
371                constants::RESPONSE_CONTENT_DISPOSITION,
372                percent_encode_path(override_content_disposition)
373            ))
374        }
375        if let Some(override_content_type) = args.override_content_type() {
376            query_args.push(format!(
377                "{}={}",
378                constants::RESPONSE_CONTENT_TYPE,
379                percent_encode_path(override_content_type)
380            ))
381        }
382        if let Some(override_cache_control) = args.override_cache_control() {
383            query_args.push(format!(
384                "{}={}",
385                constants::RESPONSE_CACHE_CONTROL,
386                percent_encode_path(override_cache_control)
387            ))
388        }
389        if let Some(version) = args.version() {
390            query_args.push(format!(
391                "{}={}",
392                constants::S3_QUERY_VERSION_ID,
393                percent_decode_path(version)
394            ))
395        }
396        if !query_args.is_empty() {
397            url.push_str(&format!("?{}", query_args.join("&")));
398        }
399
400        let mut req = Request::head(&url);
401
402        req = self.insert_sse_headers(req, false);
403
404        if let Some(if_none_match) = args.if_none_match() {
405            req = req.header(IF_NONE_MATCH, if_none_match);
406        }
407        if let Some(if_match) = args.if_match() {
408            req = req.header(IF_MATCH, if_match);
409        }
410
411        if let Some(if_modified_since) = args.if_modified_since() {
412            req = req.header(
413                IF_MODIFIED_SINCE,
414                format_datetime_into_http_date(if_modified_since),
415            );
416        }
417        if let Some(if_unmodified_since) = args.if_unmodified_since() {
418            req = req.header(
419                IF_UNMODIFIED_SINCE,
420                format_datetime_into_http_date(if_unmodified_since),
421            );
422        }
423
424        // Set request payer header if enabled.
425        req = self.insert_request_payer_header(req);
426
427        // Inject operation to the request.
428        req = req.extension(Operation::Stat);
429
430        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
431
432        Ok(req)
433    }
434
435    pub fn s3_get_object_request(
436        &self,
437        path: &str,
438        range: BytesRange,
439        args: &OpRead,
440    ) -> Result<Request<Buffer>> {
441        let p = build_abs_path(&self.root, path);
442
443        // Construct headers to add to the request
444        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
445
446        // Add query arguments to the URL based on response overrides
447        let mut query_args = Vec::new();
448        if let Some(override_content_disposition) = args.override_content_disposition() {
449            query_args.push(format!(
450                "{}={}",
451                constants::RESPONSE_CONTENT_DISPOSITION,
452                percent_encode_path(override_content_disposition)
453            ))
454        }
455        if let Some(override_content_type) = args.override_content_type() {
456            query_args.push(format!(
457                "{}={}",
458                constants::RESPONSE_CONTENT_TYPE,
459                percent_encode_path(override_content_type)
460            ))
461        }
462        if let Some(override_cache_control) = args.override_cache_control() {
463            query_args.push(format!(
464                "{}={}",
465                constants::RESPONSE_CACHE_CONTROL,
466                percent_encode_path(override_cache_control)
467            ))
468        }
469        if let Some(version) = args.version() {
470            query_args.push(format!(
471                "{}={}",
472                constants::S3_QUERY_VERSION_ID,
473                percent_decode_path(version)
474            ))
475        }
476        if !query_args.is_empty() {
477            url.push_str(&format!("?{}", query_args.join("&")));
478        }
479
480        let mut req = Request::get(&url);
481
482        if !range.is_full() {
483            req = req.header(http::header::RANGE, range.to_header());
484        }
485
486        if let Some(if_none_match) = args.if_none_match() {
487            req = req.header(IF_NONE_MATCH, if_none_match);
488        }
489
490        if let Some(if_match) = args.if_match() {
491            req = req.header(IF_MATCH, if_match);
492        }
493
494        if let Some(if_modified_since) = args.if_modified_since() {
495            req = req.header(
496                IF_MODIFIED_SINCE,
497                format_datetime_into_http_date(if_modified_since),
498            );
499        }
500
501        if let Some(if_unmodified_since) = args.if_unmodified_since() {
502            req = req.header(
503                IF_UNMODIFIED_SINCE,
504                format_datetime_into_http_date(if_unmodified_since),
505            );
506        }
507
508        // Set request payer header if enabled.
509        req = self.insert_request_payer_header(req);
510
511        // Set SSE headers.
512        // TODO: how will this work with presign?
513        req = self.insert_sse_headers(req, false);
514
515        // Inject operation to the request.
516        req = req.extension(Operation::Read);
517
518        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
519
520        Ok(req)
521    }
522
523    pub async fn s3_get_object(
524        &self,
525        path: &str,
526        range: BytesRange,
527        args: &OpRead,
528    ) -> Result<Response<HttpBody>> {
529        let mut req = self.s3_get_object_request(path, range, args)?;
530
531        self.sign(&mut req).await?;
532
533        self.info.http_client().fetch(req).await
534    }
535
536    pub fn s3_put_object_request(
537        &self,
538        path: &str,
539        size: Option<u64>,
540        args: &OpWrite,
541        body: Buffer,
542    ) -> Result<Request<Buffer>> {
543        let p = build_abs_path(&self.root, path);
544
545        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
546
547        let mut req = Request::put(&url);
548
549        req = self.insert_metadata_headers(req, size, args);
550
551        // Set request payer header if enabled.
552        req = self.insert_request_payer_header(req);
553
554        // Set SSE headers.
555        req = self.insert_sse_headers(req, true);
556
557        // Calculate Checksum.
558        if let Some(checksum) = self.calculate_checksum(&body) {
559            // Set Checksum header.
560            req = self.insert_checksum_header(req, &checksum);
561        }
562
563        // Inject operation to the request.
564        req = req.extension(Operation::Write);
565
566        // Set body
567        let req = req.body(body).map_err(new_request_build_error)?;
568
569        Ok(req)
570    }
571
572    pub fn s3_append_object_request(
573        &self,
574        path: &str,
575        position: u64,
576        size: u64,
577        args: &OpWrite,
578        body: Buffer,
579    ) -> Result<Request<Buffer>> {
580        let p = build_abs_path(&self.root, path);
581        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
582        let mut req = Request::put(&url);
583
584        // Only include full metadata headers when creating a new object via append (position == 0)
585        // For existing objects or subsequent appends, only include content-length
586        if position == 0 {
587            req = self.insert_metadata_headers(req, Some(size), args);
588        } else {
589            req = req.header(CONTENT_LENGTH, size.to_string());
590        }
591
592        req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string());
593
594        // Set request payer header if enabled.
595        req = self.insert_request_payer_header(req);
596
597        // Set SSE headers.
598        req = self.insert_sse_headers(req, true);
599
600        // Inject operation to the request.
601        req = req.extension(Operation::Write);
602
603        // Set body
604        let req = req.body(body).map_err(new_request_build_error)?;
605
606        Ok(req)
607    }
608
609    pub async fn s3_head_object(&self, path: &str, args: OpStat) -> Result<Response<Buffer>> {
610        let mut req = self.s3_head_object_request(path, args)?;
611
612        self.sign(&mut req).await?;
613
614        self.send(req).await
615    }
616
617    pub async fn s3_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
618        let p = build_abs_path(&self.root, path);
619
620        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
621
622        let mut query_args = Vec::new();
623
624        if let Some(version) = args.version() {
625            query_args.push(format!(
626                "{}={}",
627                constants::S3_QUERY_VERSION_ID,
628                percent_encode_path(version)
629            ))
630        }
631
632        if !query_args.is_empty() {
633            url.push_str(&format!("?{}", query_args.join("&")));
634        }
635
636        let mut req = Request::delete(&url);
637
638        // Set request payer header if enabled.
639        req = self.insert_request_payer_header(req);
640
641        let mut req = req
642            // Inject operation to the request.
643            .extension(Operation::Delete)
644            .body(Buffer::new())
645            .map_err(new_request_build_error)?;
646
647        self.sign(&mut req).await?;
648
649        self.send(req).await
650    }
651
652    pub async fn s3_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
653        let from = build_abs_path(&self.root, from);
654        let to = build_abs_path(&self.root, to);
655
656        let source = format!("{}/{}", self.bucket, percent_encode_path(&from));
657        let target = format!("{}/{}", self.endpoint, percent_encode_path(&to));
658
659        let mut req = Request::put(&target);
660
661        // Set SSE headers.
662        req = self.insert_sse_headers(req, true);
663
664        if let Some(v) = &self.server_side_encryption_customer_algorithm {
665            let mut v = v.clone();
666            v.set_sensitive(true);
667
668            req = req.header(
669                HeaderName::from_static(
670                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
671                ),
672                v,
673            )
674        }
675
676        if let Some(v) = &self.server_side_encryption_customer_key {
677            let mut v = v.clone();
678            v.set_sensitive(true);
679
680            req = req.header(
681                HeaderName::from_static(
682                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
683                ),
684                v,
685            )
686        }
687
688        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
689            let mut v = v.clone();
690            v.set_sensitive(true);
691
692            req = req.header(
693                HeaderName::from_static(
694                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
695                ),
696                v,
697            )
698        }
699
700        // Set request payer header if enabled.
701        req = self.insert_request_payer_header(req);
702
703        let mut req = req
704            // Inject operation to the request.
705            .extension(Operation::Copy)
706            .header(constants::X_AMZ_COPY_SOURCE, &source)
707            .body(Buffer::new())
708            .map_err(new_request_build_error)?;
709
710        self.sign(&mut req).await?;
711
712        self.send(req).await
713    }
714
715    pub async fn s3_list_objects_v1(
716        &self,
717        path: &str,
718        marker: &str,
719        delimiter: &str,
720        limit: Option<usize>,
721    ) -> Result<Response<Buffer>> {
722        let p = build_abs_path(&self.root, path);
723
724        let mut url = QueryPairsWriter::new(&self.endpoint);
725
726        if !p.is_empty() {
727            url = url.push("prefix", &percent_encode_path(&p));
728        }
729        if !delimiter.is_empty() {
730            url = url.push("delimiter", delimiter);
731        }
732        if let Some(limit) = limit {
733            url = url.push("max-keys", &limit.to_string());
734        }
735        if !marker.is_empty() {
736            url = url.push("marker", &percent_encode_path(marker));
737        }
738
739        let mut req = Request::get(url.finish());
740
741        // Set request payer header if enabled.
742        req = self.insert_request_payer_header(req);
743
744        let mut req = req
745            // Inject operation to the request.
746            .extension(Operation::List)
747            .body(Buffer::new())
748            .map_err(new_request_build_error)?;
749
750        self.sign(&mut req).await?;
751
752        self.send(req).await
753    }
754
755    pub async fn s3_list_objects_v2(
756        &self,
757        path: &str,
758        continuation_token: &str,
759        delimiter: &str,
760        limit: Option<usize>,
761        start_after: Option<String>,
762    ) -> Result<Response<Buffer>> {
763        let p = build_abs_path(&self.root, path);
764
765        let mut url = QueryPairsWriter::new(&self.endpoint);
766        url = url.push("list-type", "2");
767
768        if !p.is_empty() {
769            url = url.push("prefix", &percent_encode_path(&p));
770        }
771        if !delimiter.is_empty() {
772            url = url.push("delimiter", delimiter);
773        }
774        if let Some(limit) = limit {
775            url = url.push("max-keys", &limit.to_string());
776        }
777        if let Some(start_after) = start_after {
778            url = url.push("start-after", &percent_encode_path(&start_after));
779        }
780        if !continuation_token.is_empty() {
781            // AWS S3 could return continuation-token that contains `=`
782            // which could lead `reqsign` parse query wrongly.
783            // URL encode continuation-token before starting signing so that
784            // our signer will not be confused.
785            url = url.push(
786                "continuation-token",
787                &percent_encode_path(continuation_token),
788            );
789        }
790
791        let mut req = Request::get(url.finish());
792
793        // Set request payer header if enabled.
794        req = self.insert_request_payer_header(req);
795
796        let mut req = req
797            // Inject operation to the request.
798            .extension(Operation::List)
799            .body(Buffer::new())
800            .map_err(new_request_build_error)?;
801
802        self.sign(&mut req).await?;
803
804        self.send(req).await
805    }
806
807    pub async fn s3_initiate_multipart_upload(
808        &self,
809        path: &str,
810        args: &OpWrite,
811    ) -> Result<Response<Buffer>> {
812        let p = build_abs_path(&self.root, path);
813
814        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
815
816        let mut req = Request::post(&url);
817
818        if let Some(mime) = args.content_type() {
819            req = req.header(CONTENT_TYPE, mime)
820        }
821
822        if let Some(content_disposition) = args.content_disposition() {
823            req = req.header(CONTENT_DISPOSITION, content_disposition)
824        }
825
826        if let Some(cache_control) = args.cache_control() {
827            req = req.header(CACHE_CONTROL, cache_control)
828        }
829
830        // Set storage class header
831        if let Some(v) = &self.default_storage_class {
832            req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
833        }
834
835        // Set user metadata headers.
836        if let Some(user_metadata) = args.user_metadata() {
837            for (key, value) in user_metadata {
838                req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
839            }
840        }
841
842        // Set request payer header if enabled.
843        req = self.insert_request_payer_header(req);
844
845        // Set SSE headers.
846        req = self.insert_sse_headers(req, true);
847
848        // Set SSE headers.
849        req = self.insert_checksum_type_header(req);
850
851        // Inject operation to the request.
852        req = req.extension(Operation::Write);
853
854        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
855
856        self.sign(&mut req).await?;
857
858        self.send(req).await
859    }
860
861    pub fn s3_upload_part_request(
862        &self,
863        path: &str,
864        upload_id: &str,
865        part_number: usize,
866        size: u64,
867        body: Buffer,
868        checksum: Option<String>,
869    ) -> Result<Request<Buffer>> {
870        let p = build_abs_path(&self.root, path);
871
872        let url = format!(
873            "{}/{}?partNumber={}&uploadId={}",
874            self.endpoint,
875            percent_encode_path(&p),
876            part_number,
877            percent_encode_path(upload_id)
878        );
879
880        let mut req = Request::put(&url);
881
882        req = req.header(CONTENT_LENGTH, size);
883
884        // Set request payer header if enabled.
885        req = self.insert_request_payer_header(req);
886
887        // Set SSE headers.
888        req = self.insert_sse_headers(req, true);
889
890        if let Some(checksum) = checksum {
891            // Set Checksum header.
892            req = self.insert_checksum_header(req, &checksum);
893        }
894
895        // Inject operation to the request.
896        req = req.extension(Operation::Write);
897
898        // Set body
899        let req = req.body(body).map_err(new_request_build_error)?;
900
901        Ok(req)
902    }
903
904    pub async fn s3_complete_multipart_upload(
905        &self,
906        path: &str,
907        upload_id: &str,
908        parts: Vec<CompleteMultipartUploadRequestPart>,
909    ) -> Result<Response<Buffer>> {
910        let p = build_abs_path(&self.root, path);
911
912        let url = format!(
913            "{}/{}?uploadId={}",
914            self.endpoint,
915            percent_encode_path(&p),
916            percent_encode_path(upload_id)
917        );
918
919        let mut req = Request::post(&url);
920
921        // Set SSE headers.
922        req = self.insert_sse_headers(req, true);
923
924        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
925            .map_err(new_xml_serialize_error)?;
926        // Make sure content length has been set to avoid post with chunked encoding.
927        req = req.header(CONTENT_LENGTH, content.len());
928        // Set content-type to `application/xml` to avoid mixed with form post.
929        req = req.header(CONTENT_TYPE, "application/xml");
930
931        // Set request payer header if enabled.
932        req = self.insert_request_payer_header(req);
933
934        // Inject operation to the request.
935        req = req.extension(Operation::Write);
936
937        let mut req = req
938            .body(Buffer::from(Bytes::from(content)))
939            .map_err(new_request_build_error)?;
940
941        self.sign(&mut req).await?;
942
943        self.send(req).await
944    }
945
946    /// Abort an on-going multipart upload.
947    pub async fn s3_abort_multipart_upload(
948        &self,
949        path: &str,
950        upload_id: &str,
951    ) -> Result<Response<Buffer>> {
952        let p = build_abs_path(&self.root, path);
953
954        let url = format!(
955            "{}/{}?uploadId={}",
956            self.endpoint,
957            percent_encode_path(&p),
958            percent_encode_path(upload_id)
959        );
960
961        let mut req = Request::delete(&url);
962
963        // Set request payer header if enabled.
964        req = self.insert_request_payer_header(req);
965
966        let mut req = req
967            // Inject operation to the request.
968            .extension(Operation::Write)
969            .body(Buffer::new())
970            .map_err(new_request_build_error)?;
971
972        self.sign(&mut req).await?;
973        self.send(req).await
974    }
975
976    pub async fn s3_delete_objects(
977        &self,
978        paths: Vec<(String, OpDelete)>,
979    ) -> Result<Response<Buffer>> {
980        let url = format!("{}/?delete", self.endpoint);
981
982        let mut req = Request::post(&url);
983
984        let content = quick_xml::se::to_string(&DeleteObjectsRequest {
985            object: paths
986                .into_iter()
987                .map(|(path, op)| DeleteObjectsRequestObject {
988                    key: build_abs_path(&self.root, &path),
989                    version_id: op.version().map(|v| v.to_owned()),
990                })
991                .collect(),
992        })
993        .map_err(new_xml_serialize_error)?;
994
995        // Make sure content length has been set to avoid post with chunked encoding.
996        req = req.header(CONTENT_LENGTH, content.len());
997        // Set content-type to `application/xml` to avoid mixed with form post.
998        req = req.header(CONTENT_TYPE, "application/xml");
999        // Set content-md5 as required by API.
1000        req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes()));
1001
1002        // Set request payer header if enabled.
1003        req = self.insert_request_payer_header(req);
1004
1005        // Inject operation to the request.
1006        req = req.extension(Operation::Delete);
1007
1008        let mut req = req
1009            .body(Buffer::from(Bytes::from(content)))
1010            .map_err(new_request_build_error)?;
1011
1012        self.sign(&mut req).await?;
1013
1014        self.send(req).await
1015    }
1016
1017    pub async fn s3_list_object_versions(
1018        &self,
1019        prefix: &str,
1020        delimiter: &str,
1021        limit: Option<usize>,
1022        key_marker: &str,
1023        version_id_marker: &str,
1024    ) -> Result<Response<Buffer>> {
1025        let p = build_abs_path(&self.root, prefix);
1026
1027        let mut url = format!("{}?versions", self.endpoint);
1028        if !p.is_empty() {
1029            write!(url, "&prefix={}", percent_encode_path(p.as_str()))
1030                .expect("write into string must succeed");
1031        }
1032        if !delimiter.is_empty() {
1033            write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
1034        }
1035
1036        if let Some(limit) = limit {
1037            write!(url, "&max-keys={}", limit).expect("write into string must succeed");
1038        }
1039        if !key_marker.is_empty() {
1040            write!(url, "&key-marker={}", percent_encode_path(key_marker))
1041                .expect("write into string must succeed");
1042        }
1043        if !version_id_marker.is_empty() {
1044            write!(
1045                url,
1046                "&version-id-marker={}",
1047                percent_encode_path(version_id_marker)
1048            )
1049            .expect("write into string must succeed");
1050        }
1051
1052        let mut req = Request::get(&url);
1053
1054        // Set request payer header if enabled.
1055        req = self.insert_request_payer_header(req);
1056
1057        let mut req = req
1058            // Inject operation to the request.
1059            .extension(Operation::List)
1060            .body(Buffer::new())
1061            .map_err(new_request_build_error)?;
1062
1063        self.sign(&mut req).await?;
1064
1065        self.send(req).await
1066    }
1067}
1068
1069/// Result of CreateMultipartUpload
1070#[derive(Default, Debug, Deserialize)]
1071#[serde(default, rename_all = "PascalCase")]
1072pub struct InitiateMultipartUploadResult {
1073    pub upload_id: String,
1074}
1075
1076/// Request of CompleteMultipartUploadRequest
1077#[derive(Default, Debug, Serialize)]
1078#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
1079pub struct CompleteMultipartUploadRequest {
1080    pub part: Vec<CompleteMultipartUploadRequestPart>,
1081}
1082
1083#[derive(Clone, Default, Debug, Serialize)]
1084#[serde(default, rename_all = "PascalCase")]
1085pub struct CompleteMultipartUploadRequestPart {
1086    #[serde(rename = "PartNumber")]
1087    pub part_number: usize,
1088    /// # TODO
1089    ///
1090    /// quick-xml will do escape on `"` which leads to our serialized output is
1091    /// not the same as aws s3's example.
1092    ///
1093    /// Ideally, we could use `serialize_with` to address this (buf failed)
1094    ///
1095    /// ```ignore
1096    /// #[derive(Default, Debug, Serialize)]
1097    /// #[serde(default, rename_all = "PascalCase")]
1098    /// struct CompleteMultipartUploadRequestPart {
1099    ///     #[serde(rename = "PartNumber")]
1100    ///     part_number: usize,
1101    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
1102    ///     etag: String,
1103    /// }
1104    ///
1105    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
1106    /// where
1107    ///     S: serde::Serializer,
1108    /// {
1109    ///     ser.serialize_str(&String::from_utf8_lossy(
1110    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
1111    ///     ))
1112    /// }
1113    /// ```
1114    ///
1115    /// ref: <https://github.com/tafia/quick-xml/issues/362>
1116    #[serde(rename = "ETag")]
1117    pub etag: String,
1118    #[serde(rename = "ChecksumCRC32C", skip_serializing_if = "Option::is_none")]
1119    pub checksum_crc32c: Option<String>,
1120}
1121
1122/// Output of `CompleteMultipartUpload` operation
1123#[derive(Debug, Default, Deserialize)]
1124#[serde[default, rename_all = "PascalCase"]]
1125pub struct CompleteMultipartUploadResult {
1126    pub bucket: String,
1127    pub key: String,
1128    pub location: String,
1129    #[serde(rename = "ETag")]
1130    pub etag: String,
1131    pub code: String,
1132    pub message: String,
1133    pub request_id: String,
1134}
1135
1136/// Request of DeleteObjects.
1137#[derive(Default, Debug, Serialize)]
1138#[serde(default, rename = "Delete", rename_all = "PascalCase")]
1139pub struct DeleteObjectsRequest {
1140    pub object: Vec<DeleteObjectsRequestObject>,
1141}
1142
1143#[derive(Default, Debug, Serialize)]
1144#[serde(rename_all = "PascalCase")]
1145pub struct DeleteObjectsRequestObject {
1146    pub key: String,
1147    #[serde(skip_serializing_if = "Option::is_none")]
1148    pub version_id: Option<String>,
1149}
1150
1151/// Result of DeleteObjects.
1152#[derive(Default, Debug, Deserialize)]
1153#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")]
1154pub struct DeleteObjectsResult {
1155    pub deleted: Vec<DeleteObjectsResultDeleted>,
1156    pub error: Vec<DeleteObjectsResultError>,
1157}
1158
1159#[derive(Default, Debug, Deserialize)]
1160#[serde(rename_all = "PascalCase")]
1161pub struct DeleteObjectsResultDeleted {
1162    pub key: String,
1163    pub version_id: Option<String>,
1164}
1165
1166#[derive(Default, Debug, Deserialize)]
1167#[serde(default, rename_all = "PascalCase")]
1168pub struct DeleteObjectsResultError {
1169    pub code: String,
1170    pub key: String,
1171    pub message: String,
1172    pub version_id: Option<String>,
1173}
1174
1175/// Output of ListBucket/ListObjects (a.k.a ListObjectsV1).
1176#[derive(Default, Debug, Deserialize)]
1177#[serde(default, rename_all = "PascalCase")]
1178pub struct ListObjectsOutputV1 {
1179    pub is_truncated: Option<bool>,
1180    /// ## Notes
1181    ///
1182    /// `next_marker` is returned only if we have the delimiter request parameter
1183    /// specified. If the response does not include the NextMarker element and it
1184    /// is truncated, we should use the value of the last Key element in the
1185    /// response as the marker parameter in the subsequent request to get the
1186    /// next set of object keys.
1187    ///
1188    /// If the contents is empty, we should find common_prefixes instead.
1189    pub next_marker: Option<String>,
1190    pub common_prefixes: Vec<OutputCommonPrefix>,
1191    pub contents: Vec<ListObjectsOutputContent>,
1192}
1193
1194/// Output of ListBucketV2/ListObjectsV2.
1195///
1196/// ## Note
1197///
1198/// Use `Option` in `is_truncated` and `next_continuation_token` to make
1199/// the behavior more clear so that we can be compatible to more s3 services.
1200///
1201/// And enable `serde(default)` so that we can keep going even when some field
1202/// is not exist.
1203#[derive(Default, Debug, Deserialize)]
1204#[serde(default, rename_all = "PascalCase")]
1205pub struct ListObjectsOutputV2 {
1206    pub is_truncated: Option<bool>,
1207    pub next_continuation_token: Option<String>,
1208    pub common_prefixes: Vec<OutputCommonPrefix>,
1209    pub contents: Vec<ListObjectsOutputContent>,
1210}
1211
1212#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1213#[serde(rename_all = "PascalCase")]
1214pub struct ListObjectsOutputContent {
1215    pub key: String,
1216    pub size: u64,
1217    pub last_modified: String,
1218    #[serde(rename = "ETag")]
1219    pub etag: Option<String>,
1220}
1221
1222#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1223#[serde(rename_all = "PascalCase")]
1224pub struct OutputCommonPrefix {
1225    pub prefix: String,
1226}
1227
1228/// Output of ListObjectVersions
1229#[derive(Default, Debug, Deserialize)]
1230#[serde(default, rename_all = "PascalCase")]
1231pub struct ListObjectVersionsOutput {
1232    pub is_truncated: Option<bool>,
1233    pub next_key_marker: Option<String>,
1234    pub next_version_id_marker: Option<String>,
1235    pub common_prefixes: Vec<OutputCommonPrefix>,
1236    pub version: Vec<ListObjectVersionsOutputVersion>,
1237    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
1238}
1239
1240#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1241#[serde(rename_all = "PascalCase")]
1242pub struct ListObjectVersionsOutputVersion {
1243    pub key: String,
1244    pub version_id: String,
1245    pub is_latest: bool,
1246    pub size: u64,
1247    pub last_modified: String,
1248    #[serde(rename = "ETag")]
1249    pub etag: Option<String>,
1250}
1251
1252#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1253#[serde(rename_all = "PascalCase")]
1254pub struct ListObjectVersionsOutputDeleteMarker {
1255    pub key: String,
1256    pub version_id: String,
1257    pub is_latest: bool,
1258    pub last_modified: String,
1259}
1260
1261pub enum ChecksumAlgorithm {
1262    Crc32c,
1263}
1264impl ChecksumAlgorithm {
1265    pub fn to_header_name(&self) -> HeaderName {
1266        match self {
1267            Self::Crc32c => HeaderName::from_static("x-amz-checksum-crc32c"),
1268        }
1269    }
1270}
1271impl Display for ChecksumAlgorithm {
1272    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1273        write!(
1274            f,
1275            "{}",
1276            match self {
1277                Self::Crc32c => "CRC32C",
1278            }
1279        )
1280    }
1281}
1282
1283#[cfg(test)]
1284mod tests {
1285    use bytes::Buf;
1286    use bytes::Bytes;
1287
1288    use super::*;
1289
1290    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples
1291    #[test]
1292    fn test_deserialize_initiate_multipart_upload_result() {
1293        let bs = Bytes::from(
1294            r#"<?xml version="1.0" encoding="UTF-8"?>
1295            <InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1296              <Bucket>example-bucket</Bucket>
1297              <Key>example-object</Key>
1298              <UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
1299            </InitiateMultipartUploadResult>"#,
1300        );
1301
1302        let out: InitiateMultipartUploadResult =
1303            quick_xml::de::from_reader(bs.reader()).expect("must success");
1304
1305        assert_eq!(
1306            out.upload_id,
1307            "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"
1308        )
1309    }
1310
1311    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples
1312    #[test]
1313    fn test_serialize_complete_multipart_upload_request() {
1314        let req = CompleteMultipartUploadRequest {
1315            part: vec![
1316                CompleteMultipartUploadRequestPart {
1317                    part_number: 1,
1318                    etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(),
1319                    ..Default::default()
1320                },
1321                CompleteMultipartUploadRequestPart {
1322                    part_number: 2,
1323                    etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(),
1324                    ..Default::default()
1325                },
1326                CompleteMultipartUploadRequestPart {
1327                    part_number: 3,
1328                    etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(),
1329                    ..Default::default()
1330                },
1331            ],
1332        };
1333
1334        let actual = quick_xml::se::to_string(&req).expect("must succeed");
1335
1336        pretty_assertions::assert_eq!(
1337            actual,
1338            r#"<CompleteMultipartUpload>
1339             <Part>
1340                <PartNumber>1</PartNumber>
1341               <ETag>"a54357aff0632cce46d942af68356b38"</ETag>
1342             </Part>
1343             <Part>
1344                <PartNumber>2</PartNumber>
1345               <ETag>"0c78aef83f66abc1fa1e8477f296d394"</ETag>
1346             </Part>
1347             <Part>
1348               <PartNumber>3</PartNumber>
1349               <ETag>"acbd18db4cc2f85cedef654fccc4a4d8"</ETag>
1350             </Part>
1351            </CompleteMultipartUpload>"#
1352                // Cleanup space and new line
1353                .replace([' ', '\n'], "")
1354        )
1355    }
1356
1357    /// this example is from: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
1358    #[test]
1359    fn test_deserialize_complete_multipart_upload_result() {
1360        let bs = Bytes::from(
1361            r#"<?xml version="1.0" encoding="UTF-8"?>
1362            <CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1363             <Location>http://Example-Bucket.s3.region.amazonaws.com/Example-Object</Location>
1364             <Bucket>Example-Bucket</Bucket>
1365             <Key>Example-Object</Key>
1366             <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
1367            </CompleteMultipartUploadResult>"#,
1368        );
1369
1370        let out: CompleteMultipartUploadResult =
1371            quick_xml::de::from_reader(bs.reader()).expect("must success");
1372
1373        assert_eq!(out.bucket, "Example-Bucket");
1374        assert_eq!(out.key, "Example-Object");
1375        assert_eq!(
1376            out.location,
1377            "http://Example-Bucket.s3.region.amazonaws.com/Example-Object"
1378        );
1379        assert_eq!(out.etag, "\"3858f62230ac3c915f300c664312c11f-9\"");
1380    }
1381
1382    #[test]
1383    fn test_deserialize_complete_multipart_upload_result_when_return_error() {
1384        let bs = Bytes::from(
1385            r#"<?xml version="1.0" encoding="UTF-8"?>
1386
1387                <Error>
1388                <Code>InternalError</Code>
1389                <Message>We encountered an internal error. Please try again.</Message>
1390                <RequestId>656c76696e6727732072657175657374</RequestId>
1391                <HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
1392                </Error>"#,
1393        );
1394
1395        let out: CompleteMultipartUploadResult =
1396            quick_xml::de::from_reader(bs.reader()).expect("must success");
1397
1398        assert_eq!(out.code, "InternalError");
1399        assert_eq!(
1400            out.message,
1401            "We encountered an internal error. Please try again."
1402        );
1403        assert_eq!(out.request_id, "656c76696e6727732072657175657374");
1404    }
1405
1406    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
1407    #[test]
1408    fn test_serialize_delete_objects_request() {
1409        let req = DeleteObjectsRequest {
1410            object: vec![
1411                DeleteObjectsRequestObject {
1412                    key: "sample1.txt".to_string(),
1413                    version_id: None,
1414                },
1415                DeleteObjectsRequestObject {
1416                    key: "sample2.txt".to_string(),
1417                    version_id: Some("11111".to_owned()),
1418                },
1419            ],
1420        };
1421
1422        let actual = quick_xml::se::to_string(&req).expect("must succeed");
1423
1424        pretty_assertions::assert_eq!(
1425            actual,
1426            r#"<Delete>
1427             <Object>
1428             <Key>sample1.txt</Key>
1429             </Object>
1430             <Object>
1431               <Key>sample2.txt</Key>
1432               <VersionId>11111</VersionId>
1433             </Object>
1434             </Delete>"#
1435                // Cleanup space and new line
1436                .replace([' ', '\n'], "")
1437        )
1438    }
1439
1440    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
1441    #[test]
1442    fn test_deserialize_delete_objects_result() {
1443        let bs = Bytes::from(
1444            r#"<?xml version="1.0" encoding="UTF-8"?>
1445            <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1446             <Deleted>
1447               <Key>sample1.txt</Key>
1448             </Deleted>
1449             <Error>
1450              <Key>sample2.txt</Key>
1451              <Code>AccessDenied</Code>
1452              <Message>Access Denied</Message>
1453             </Error>
1454            </DeleteResult>"#,
1455        );
1456
1457        let out: DeleteObjectsResult =
1458            quick_xml::de::from_reader(bs.reader()).expect("must success");
1459
1460        assert_eq!(out.deleted.len(), 1);
1461        assert_eq!(out.deleted[0].key, "sample1.txt");
1462        assert_eq!(out.error.len(), 1);
1463        assert_eq!(out.error[0].key, "sample2.txt");
1464        assert_eq!(out.error[0].code, "AccessDenied");
1465        assert_eq!(out.error[0].message, "Access Denied");
1466    }
1467
1468    #[test]
1469    fn test_deserialize_delete_objects_with_version_id() {
1470        let bs = Bytes::from(
1471            r#"<?xml version="1.0" encoding="UTF-8"?>
1472                  <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1473                    <Deleted>
1474                      <Key>SampleDocument.txt</Key>
1475                      <VersionId>OYcLXagmS.WaD..oyH4KRguB95_YhLs7</VersionId>
1476                    </Deleted>
1477                  </DeleteResult>"#,
1478        );
1479
1480        let out: DeleteObjectsResult =
1481            quick_xml::de::from_reader(bs.reader()).expect("must success");
1482
1483        assert_eq!(out.deleted.len(), 1);
1484        assert_eq!(out.deleted[0].key, "SampleDocument.txt");
1485        assert_eq!(
1486            out.deleted[0].version_id,
1487            Some("OYcLXagmS.WaD..oyH4KRguB95_YhLs7".to_owned())
1488        );
1489        assert_eq!(out.error.len(), 0);
1490    }
1491
1492    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#API_ListObjects_Examples
1493    #[test]
1494    fn test_parse_list_output_v1() {
1495        let bs = bytes::Bytes::from(
1496            r#"<?xml version="1.0" encoding="UTF-8"?>
1497            <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1498                <Name>bucket</Name>
1499                <Prefix/>
1500                <Marker/>
1501                <MaxKeys>1000</MaxKeys>
1502                <IsTruncated>false</IsTruncated>
1503                <Contents>
1504                    <Key>my-image.jpg</Key>
1505                    <LastModified>2009-10-12T17:50:30.000Z</LastModified>
1506                    <ETag>"fba9dede5f27731c9771645a39863328"</ETag>
1507                    <Size>434234</Size>
1508                    <StorageClass>STANDARD</StorageClass>
1509                    <Owner>
1510                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1511                        <DisplayName>mtd@amazon.com</DisplayName>
1512                    </Owner>
1513                </Contents>
1514                <Contents>
1515                   <Key>my-third-image.jpg</Key>
1516                     <LastModified>2009-10-12T17:50:30.000Z</LastModified>
1517                     <ETag>"1b2cf535f27731c974343645a3985328"</ETag>
1518                     <Size>64994</Size>
1519                     <StorageClass>STANDARD_IA</StorageClass>
1520                     <Owner>
1521                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1522                        <DisplayName>mtd@amazon.com</DisplayName>
1523                    </Owner>
1524                </Contents>
1525            </ListBucketResult>"#,
1526        );
1527
1528        let out: ListObjectsOutputV1 =
1529            quick_xml::de::from_reader(bs.reader()).expect("must success");
1530
1531        assert!(!out.is_truncated.unwrap());
1532        assert!(out.next_marker.is_none());
1533        assert!(out.common_prefixes.is_empty());
1534        assert_eq!(
1535            out.contents,
1536            vec![
1537                ListObjectsOutputContent {
1538                    key: "my-image.jpg".to_string(),
1539                    size: 434234,
1540                    etag: Some("\"fba9dede5f27731c9771645a39863328\"".to_string()),
1541                    last_modified: "2009-10-12T17:50:30.000Z".to_string(),
1542                },
1543                ListObjectsOutputContent {
1544                    key: "my-third-image.jpg".to_string(),
1545                    size: 64994,
1546                    last_modified: "2009-10-12T17:50:30.000Z".to_string(),
1547                    etag: Some("\"1b2cf535f27731c974343645a3985328\"".to_string()),
1548                },
1549            ]
1550        )
1551    }
1552
1553    #[test]
1554    fn test_parse_list_output_v2() {
1555        let bs = bytes::Bytes::from(
1556            r#"<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1557  <Name>example-bucket</Name>
1558  <Prefix>photos/2006/</Prefix>
1559  <KeyCount>3</KeyCount>
1560  <MaxKeys>1000</MaxKeys>
1561  <Delimiter>/</Delimiter>
1562  <IsTruncated>false</IsTruncated>
1563  <Contents>
1564    <Key>photos/2006</Key>
1565    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
1566    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
1567    <Size>56</Size>
1568    <StorageClass>STANDARD</StorageClass>
1569  </Contents>
1570  <Contents>
1571    <Key>photos/2007</Key>
1572    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
1573    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
1574    <Size>100</Size>
1575    <StorageClass>STANDARD</StorageClass>
1576  </Contents>
1577  <Contents>
1578    <Key>photos/2008</Key>
1579    <LastModified>2016-05-30T23:51:29.000Z</LastModified>
1580    <Size>42</Size>
1581  </Contents>
1582
1583  <CommonPrefixes>
1584    <Prefix>photos/2006/February/</Prefix>
1585  </CommonPrefixes>
1586  <CommonPrefixes>
1587    <Prefix>photos/2006/January/</Prefix>
1588  </CommonPrefixes>
1589</ListBucketResult>"#,
1590        );
1591
1592        let out: ListObjectsOutputV2 =
1593            quick_xml::de::from_reader(bs.reader()).expect("must success");
1594
1595        assert!(!out.is_truncated.unwrap());
1596        assert!(out.next_continuation_token.is_none());
1597        assert_eq!(
1598            out.common_prefixes
1599                .iter()
1600                .map(|v| v.prefix.clone())
1601                .collect::<Vec<String>>(),
1602            vec!["photos/2006/February/", "photos/2006/January/"]
1603        );
1604        assert_eq!(
1605            out.contents,
1606            vec![
1607                ListObjectsOutputContent {
1608                    key: "photos/2006".to_string(),
1609                    size: 56,
1610                    etag: Some("\"d41d8cd98f00b204e9800998ecf8427e\"".to_string()),
1611                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
1612                },
1613                ListObjectsOutputContent {
1614                    key: "photos/2007".to_string(),
1615                    size: 100,
1616                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
1617                    etag: Some("\"d41d8cd98f00b204e9800998ecf8427e\"".to_string()),
1618                },
1619                ListObjectsOutputContent {
1620                    key: "photos/2008".to_string(),
1621                    size: 42,
1622                    last_modified: "2016-05-30T23:51:29.000Z".to_string(),
1623                    etag: None,
1624                },
1625            ]
1626        )
1627    }
1628
1629    #[test]
1630    fn test_parse_list_object_versions() {
1631        let bs = bytes::Bytes::from(
1632            r#"<?xml version="1.0" encoding="UTF-8"?>
1633                <ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1634                <Name>mtp-versioning-fresh</Name>
1635                <Prefix/>
1636                <KeyMarker>key3</KeyMarker>
1637                <VersionIdMarker>null</VersionIdMarker>
1638                <NextKeyMarker>key3</NextKeyMarker>
1639                <NextVersionIdMarker>d-d309mfjFrUmoQ0DBsVqmcMV15OI.</NextVersionIdMarker>
1640                <MaxKeys>3</MaxKeys>
1641                <IsTruncated>true</IsTruncated>
1642                <Version>
1643                    <Key>key3</Key>
1644                    <VersionId>8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.</VersionId>
1645                    <IsLatest>true</IsLatest>
1646                    <LastModified>2009-12-09T00:18:23.000Z</LastModified>
1647                    <ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
1648                    <Size>217</Size>
1649                    <Owner>
1650                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1651                    </Owner>
1652                    <StorageClass>STANDARD</StorageClass>
1653                </Version>
1654                <Version>
1655                    <Key>key3</Key>
1656                    <VersionId>d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.</VersionId>
1657                    <IsLatest>false</IsLatest>
1658                    <LastModified>2009-12-09T00:18:08.000Z</LastModified>
1659                    <ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
1660                    <Size>217</Size>
1661                    <Owner>
1662                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1663                    </Owner>
1664                    <StorageClass>STANDARD</StorageClass>
1665                </Version>
1666                <CommonPrefixes>
1667                    <Prefix>photos/</Prefix>
1668                </CommonPrefixes>
1669                <CommonPrefixes>
1670                    <Prefix>videos/</Prefix>
1671                </CommonPrefixes>
1672                 <DeleteMarker>
1673                    <Key>my-third-image.jpg</Key>
1674                    <VersionId>03jpff543dhffds434rfdsFDN943fdsFkdmqnh892</VersionId>
1675                    <IsLatest>true</IsLatest>
1676                    <LastModified>2009-10-15T17:50:30.000Z</LastModified>
1677                    <Owner>
1678                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1679                        <DisplayName>mtd@amazon.com</DisplayName>
1680                    </Owner>
1681                </DeleteMarker>
1682                </ListVersionsResult>"#,
1683        );
1684
1685        let output: ListObjectVersionsOutput =
1686            quick_xml::de::from_reader(bs.reader()).expect("must succeed");
1687
1688        assert!(output.is_truncated.unwrap());
1689        assert_eq!(output.next_key_marker, Some("key3".to_owned()));
1690        assert_eq!(
1691            output.next_version_id_marker,
1692            Some("d-d309mfjFrUmoQ0DBsVqmcMV15OI.".to_owned())
1693        );
1694        assert_eq!(
1695            output.common_prefixes,
1696            vec![
1697                OutputCommonPrefix {
1698                    prefix: "photos/".to_owned()
1699                },
1700                OutputCommonPrefix {
1701                    prefix: "videos/".to_owned()
1702                }
1703            ]
1704        );
1705
1706        assert_eq!(
1707            output.version,
1708            vec![
1709                ListObjectVersionsOutputVersion {
1710                    key: "key3".to_owned(),
1711                    version_id: "8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.".to_owned(),
1712                    is_latest: true,
1713                    size: 217,
1714                    last_modified: "2009-12-09T00:18:23.000Z".to_owned(),
1715                    etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
1716                },
1717                ListObjectVersionsOutputVersion {
1718                    key: "key3".to_owned(),
1719                    version_id: "d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.".to_owned(),
1720                    is_latest: false,
1721                    size: 217,
1722                    last_modified: "2009-12-09T00:18:08.000Z".to_owned(),
1723                    etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
1724                }
1725            ]
1726        );
1727
1728        assert_eq!(
1729            output.delete_marker,
1730            vec![ListObjectVersionsOutputDeleteMarker {
1731                key: "my-third-image.jpg".to_owned(),
1732                version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(),
1733                is_latest: true,
1734                last_modified: "2009-10-15T17:50:30.000Z".to_owned(),
1735            },]
1736        );
1737    }
1738}