opendal/services/s3/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Display;
21use std::fmt::Formatter;
22use std::fmt::Write;
23use std::sync::atomic::AtomicBool;
24use std::sync::{atomic, Arc};
25use std::time::Duration;
26
27use base64::prelude::BASE64_STANDARD;
28use base64::Engine;
29use bytes::Bytes;
30use constants::X_AMZ_META_PREFIX;
31use http::header::CACHE_CONTROL;
32use http::header::CONTENT_DISPOSITION;
33use http::header::CONTENT_ENCODING;
34use http::header::CONTENT_LENGTH;
35use http::header::CONTENT_TYPE;
36use http::header::HOST;
37use http::header::IF_MATCH;
38use http::header::IF_NONE_MATCH;
39use http::header::{HeaderName, IF_MODIFIED_SINCE, IF_UNMODIFIED_SINCE};
40use http::HeaderValue;
41use http::Request;
42use http::Response;
43use reqsign::AwsCredential;
44use reqsign::AwsCredentialLoad;
45use reqsign::AwsV4Signer;
46use serde::Deserialize;
47use serde::Serialize;
48
49use crate::raw::*;
50use crate::*;
51
52pub mod constants {
53    pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
54
55    pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption";
56    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
57        "x-amz-server-side-encryption-customer-algorithm";
58    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
59        "x-amz-server-side-encryption-customer-key";
60    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
61        "x-amz-server-side-encryption-customer-key-md5";
62    pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str =
63        "x-amz-server-side-encryption-aws-kms-key-id";
64    pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
65
66    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str =
67        "x-amz-copy-source-server-side-encryption-customer-algorithm";
68    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
69        "x-amz-copy-source-server-side-encryption-customer-key";
70    pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
71        "x-amz-copy-source-server-side-encryption-customer-key-md5";
72
73    pub const X_AMZ_WRITE_OFFSET_BYTES: &str = "x-amz-write-offset-bytes";
74
75    pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-";
76
77    pub const X_AMZ_VERSION_ID: &str = "x-amz-version-id";
78    pub const X_AMZ_OBJECT_SIZE: &str = "x-amz-object-size";
79
80    pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";
81    pub const RESPONSE_CONTENT_TYPE: &str = "response-content-type";
82    pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
83
84    pub const S3_QUERY_VERSION_ID: &str = "versionId";
85}
86
87pub struct S3Core {
88    pub info: Arc<AccessorInfo>,
89
90    pub bucket: String,
91    pub endpoint: String,
92    pub root: String,
93    pub server_side_encryption: Option<HeaderValue>,
94    pub server_side_encryption_aws_kms_key_id: Option<HeaderValue>,
95    pub server_side_encryption_customer_algorithm: Option<HeaderValue>,
96    pub server_side_encryption_customer_key: Option<HeaderValue>,
97    pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
98    pub default_storage_class: Option<HeaderValue>,
99    pub allow_anonymous: bool,
100
101    pub signer: AwsV4Signer,
102    pub loader: Box<dyn AwsCredentialLoad>,
103    pub credential_loaded: AtomicBool,
104    pub checksum_algorithm: Option<ChecksumAlgorithm>,
105}
106
107impl Debug for S3Core {
108    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109        f.debug_struct("S3Core")
110            .field("bucket", &self.bucket)
111            .field("endpoint", &self.endpoint)
112            .field("root", &self.root)
113            .finish_non_exhaustive()
114    }
115}
116
117impl S3Core {
118    /// If credential is not found, we will not sign the request.
119    async fn load_credential(&self) -> Result<Option<AwsCredential>> {
120        let cred = self
121            .loader
122            .load_credential(GLOBAL_REQWEST_CLIENT.clone())
123            .await
124            .map_err(new_request_credential_error)?;
125
126        if let Some(cred) = cred {
127            // Update credential_loaded to true if we have load credential successfully.
128            self.credential_loaded
129                .store(true, atomic::Ordering::Relaxed);
130            return Ok(Some(cred));
131        }
132
133        // If we have load credential before but failed to load this time, we should
134        // return error instead.
135        if self.credential_loaded.load(atomic::Ordering::Relaxed) {
136            return Err(Error::new(
137                ErrorKind::PermissionDenied,
138                "credential was previously loaded successfully but has failed this time",
139            )
140            .set_temporary());
141        }
142
143        // Credential is empty and users allow anonymous access, we will not sign the request.
144        if self.allow_anonymous {
145            return Ok(None);
146        }
147
148        Err(Error::new(
149            ErrorKind::PermissionDenied,
150            "no valid credential found and anonymous access is not allowed",
151        ))
152    }
153
154    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
155        let cred = if let Some(cred) = self.load_credential().await? {
156            cred
157        } else {
158            return Ok(());
159        };
160
161        self.signer
162            .sign(req, &cred)
163            .map_err(new_request_sign_error)?;
164
165        // Always remove host header, let users' client to set it based on HTTP
166        // version.
167        //
168        // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
169        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
170        // contains host header.
171        req.headers_mut().remove(HOST);
172
173        Ok(())
174    }
175
176    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
177        let cred = if let Some(cred) = self.load_credential().await? {
178            cred
179        } else {
180            return Ok(());
181        };
182
183        self.signer
184            .sign_query(req, duration, &cred)
185            .map_err(new_request_sign_error)?;
186
187        // Always remove host header, let users' client to set it based on HTTP
188        // version.
189        //
190        // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>,
191        // google server could send RST_STREAM of PROTOCOL_ERROR if our request
192        // contains host header.
193        req.headers_mut().remove(HOST);
194
195        Ok(())
196    }
197
198    #[inline]
199    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
200        self.info.http_client().send(req).await
201    }
202
203    /// # Note
204    ///
205    /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while
206    /// get or stat.
207    pub fn insert_sse_headers(
208        &self,
209        mut req: http::request::Builder,
210        is_write: bool,
211    ) -> http::request::Builder {
212        if is_write {
213            if let Some(v) = &self.server_side_encryption {
214                let mut v = v.clone();
215                v.set_sensitive(true);
216
217                req = req.header(
218                    HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION),
219                    v,
220                )
221            }
222            if let Some(v) = &self.server_side_encryption_aws_kms_key_id {
223                let mut v = v.clone();
224                v.set_sensitive(true);
225
226                req = req.header(
227                    HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID),
228                    v,
229                )
230            }
231        }
232
233        if let Some(v) = &self.server_side_encryption_customer_algorithm {
234            let mut v = v.clone();
235            v.set_sensitive(true);
236
237            req = req.header(
238                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM),
239                v,
240            )
241        }
242        if let Some(v) = &self.server_side_encryption_customer_key {
243            let mut v = v.clone();
244            v.set_sensitive(true);
245
246            req = req.header(
247                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY),
248                v,
249            )
250        }
251        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
252            let mut v = v.clone();
253            v.set_sensitive(true);
254
255            req = req.header(
256                HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5),
257                v,
258            )
259        }
260
261        req
262    }
263    pub fn calculate_checksum(&self, body: &Buffer) -> Option<String> {
264        match self.checksum_algorithm {
265            None => None,
266            Some(ChecksumAlgorithm::Crc32c) => {
267                let mut crc = 0u32;
268                body.clone()
269                    .for_each(|b| crc = crc32c::crc32c_append(crc, &b));
270                Some(BASE64_STANDARD.encode(crc.to_be_bytes()))
271            }
272        }
273    }
274    pub fn insert_checksum_header(
275        &self,
276        mut req: http::request::Builder,
277        checksum: &str,
278    ) -> http::request::Builder {
279        if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
280            req = req.header(checksum_algorithm.to_header_name(), checksum);
281        }
282        req
283    }
284
285    pub fn insert_checksum_type_header(
286        &self,
287        mut req: http::request::Builder,
288    ) -> http::request::Builder {
289        if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
290            req = req.header("x-amz-checksum-algorithm", checksum_algorithm.to_string());
291        }
292        req
293    }
294
295    pub fn insert_metadata_headers(
296        &self,
297        mut req: http::request::Builder,
298        size: Option<u64>,
299        args: &OpWrite,
300    ) -> http::request::Builder {
301        if let Some(size) = size {
302            req = req.header(CONTENT_LENGTH, size.to_string())
303        }
304
305        if let Some(mime) = args.content_type() {
306            req = req.header(CONTENT_TYPE, mime)
307        }
308
309        if let Some(pos) = args.content_disposition() {
310            req = req.header(CONTENT_DISPOSITION, pos)
311        }
312
313        if let Some(encoding) = args.content_encoding() {
314            req = req.header(CONTENT_ENCODING, encoding);
315        }
316
317        if let Some(cache_control) = args.cache_control() {
318            req = req.header(CACHE_CONTROL, cache_control)
319        }
320
321        if let Some(if_match) = args.if_match() {
322            req = req.header(IF_MATCH, if_match);
323        }
324
325        if args.if_not_exists() {
326            req = req.header(IF_NONE_MATCH, "*");
327        }
328
329        // Set storage class header
330        if let Some(v) = &self.default_storage_class {
331            req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
332        }
333
334        // Set user metadata headers.
335        if let Some(user_metadata) = args.user_metadata() {
336            for (key, value) in user_metadata {
337                req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
338            }
339        }
340        req
341    }
342}
343
344impl S3Core {
345    pub fn s3_head_object_request(&self, path: &str, args: OpStat) -> Result<Request<Buffer>> {
346        let p = build_abs_path(&self.root, path);
347
348        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
349
350        // Add query arguments to the URL based on response overrides
351        let mut query_args = Vec::new();
352        if let Some(override_content_disposition) = args.override_content_disposition() {
353            query_args.push(format!(
354                "{}={}",
355                constants::RESPONSE_CONTENT_DISPOSITION,
356                percent_encode_path(override_content_disposition)
357            ))
358        }
359        if let Some(override_content_type) = args.override_content_type() {
360            query_args.push(format!(
361                "{}={}",
362                constants::RESPONSE_CONTENT_TYPE,
363                percent_encode_path(override_content_type)
364            ))
365        }
366        if let Some(override_cache_control) = args.override_cache_control() {
367            query_args.push(format!(
368                "{}={}",
369                constants::RESPONSE_CACHE_CONTROL,
370                percent_encode_path(override_cache_control)
371            ))
372        }
373        if let Some(version) = args.version() {
374            query_args.push(format!(
375                "{}={}",
376                constants::S3_QUERY_VERSION_ID,
377                percent_decode_path(version)
378            ))
379        }
380        if !query_args.is_empty() {
381            url.push_str(&format!("?{}", query_args.join("&")));
382        }
383
384        let mut req = Request::head(&url);
385
386        req = self.insert_sse_headers(req, false);
387
388        if let Some(if_none_match) = args.if_none_match() {
389            req = req.header(IF_NONE_MATCH, if_none_match);
390        }
391        if let Some(if_match) = args.if_match() {
392            req = req.header(IF_MATCH, if_match);
393        }
394
395        if let Some(if_modified_since) = args.if_modified_since() {
396            req = req.header(
397                IF_MODIFIED_SINCE,
398                format_datetime_into_http_date(if_modified_since),
399            );
400        }
401        if let Some(if_unmodified_since) = args.if_unmodified_since() {
402            req = req.header(
403                IF_UNMODIFIED_SINCE,
404                format_datetime_into_http_date(if_unmodified_since),
405            );
406        }
407
408        // Inject operation to the request.
409        let req = req.extension(Operation::Stat);
410
411        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
412
413        Ok(req)
414    }
415
416    pub fn s3_get_object_request(
417        &self,
418        path: &str,
419        range: BytesRange,
420        args: &OpRead,
421    ) -> Result<Request<Buffer>> {
422        let p = build_abs_path(&self.root, path);
423
424        // Construct headers to add to the request
425        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
426
427        // Add query arguments to the URL based on response overrides
428        let mut query_args = Vec::new();
429        if let Some(override_content_disposition) = args.override_content_disposition() {
430            query_args.push(format!(
431                "{}={}",
432                constants::RESPONSE_CONTENT_DISPOSITION,
433                percent_encode_path(override_content_disposition)
434            ))
435        }
436        if let Some(override_content_type) = args.override_content_type() {
437            query_args.push(format!(
438                "{}={}",
439                constants::RESPONSE_CONTENT_TYPE,
440                percent_encode_path(override_content_type)
441            ))
442        }
443        if let Some(override_cache_control) = args.override_cache_control() {
444            query_args.push(format!(
445                "{}={}",
446                constants::RESPONSE_CACHE_CONTROL,
447                percent_encode_path(override_cache_control)
448            ))
449        }
450        if let Some(version) = args.version() {
451            query_args.push(format!(
452                "{}={}",
453                constants::S3_QUERY_VERSION_ID,
454                percent_decode_path(version)
455            ))
456        }
457        if !query_args.is_empty() {
458            url.push_str(&format!("?{}", query_args.join("&")));
459        }
460
461        let mut req = Request::get(&url);
462
463        if !range.is_full() {
464            req = req.header(http::header::RANGE, range.to_header());
465        }
466
467        if let Some(if_none_match) = args.if_none_match() {
468            req = req.header(IF_NONE_MATCH, if_none_match);
469        }
470
471        if let Some(if_match) = args.if_match() {
472            req = req.header(IF_MATCH, if_match);
473        }
474
475        if let Some(if_modified_since) = args.if_modified_since() {
476            req = req.header(
477                IF_MODIFIED_SINCE,
478                format_datetime_into_http_date(if_modified_since),
479            );
480        }
481
482        if let Some(if_unmodified_since) = args.if_unmodified_since() {
483            req = req.header(
484                IF_UNMODIFIED_SINCE,
485                format_datetime_into_http_date(if_unmodified_since),
486            );
487        }
488
489        // Set SSE headers.
490        // TODO: how will this work with presign?
491        req = self.insert_sse_headers(req, false);
492
493        // Inject operation to the request.
494        let req = req.extension(Operation::Read);
495
496        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
497
498        Ok(req)
499    }
500
501    pub async fn s3_get_object(
502        &self,
503        path: &str,
504        range: BytesRange,
505        args: &OpRead,
506    ) -> Result<Response<HttpBody>> {
507        let mut req = self.s3_get_object_request(path, range, args)?;
508
509        self.sign(&mut req).await?;
510
511        self.info.http_client().fetch(req).await
512    }
513
514    pub fn s3_put_object_request(
515        &self,
516        path: &str,
517        size: Option<u64>,
518        args: &OpWrite,
519        body: Buffer,
520    ) -> Result<Request<Buffer>> {
521        let p = build_abs_path(&self.root, path);
522
523        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
524
525        let mut req = Request::put(&url);
526
527        req = self.insert_metadata_headers(req, size, args);
528
529        // Set SSE headers.
530        req = self.insert_sse_headers(req, true);
531
532        // Calculate Checksum.
533        if let Some(checksum) = self.calculate_checksum(&body) {
534            // Set Checksum header.
535            req = self.insert_checksum_header(req, &checksum);
536        }
537
538        // Inject operation to the request.
539        let req = req.extension(Operation::Write);
540
541        // Set body
542        let req = req.body(body).map_err(new_request_build_error)?;
543
544        Ok(req)
545    }
546
547    pub fn s3_append_object_request(
548        &self,
549        path: &str,
550        position: u64,
551        size: u64,
552        args: &OpWrite,
553        body: Buffer,
554    ) -> Result<Request<Buffer>> {
555        let p = build_abs_path(&self.root, path);
556        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
557        let mut req = Request::put(&url);
558
559        // Only include full metadata headers when creating a new object via append (position == 0)
560        // For existing objects or subsequent appends, only include content-length
561        if position == 0 {
562            req = self.insert_metadata_headers(req, Some(size), args);
563        } else {
564            req = req.header(CONTENT_LENGTH, size.to_string());
565        }
566
567        req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string());
568
569        // Set SSE headers.
570        req = self.insert_sse_headers(req, true);
571
572        // Inject operation to the request.
573        let req = req.extension(Operation::Write);
574
575        // Set body
576        let req = req.body(body).map_err(new_request_build_error)?;
577
578        Ok(req)
579    }
580
581    pub async fn s3_head_object(&self, path: &str, args: OpStat) -> Result<Response<Buffer>> {
582        let mut req = self.s3_head_object_request(path, args)?;
583
584        self.sign(&mut req).await?;
585
586        self.send(req).await
587    }
588
589    pub async fn s3_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
590        let p = build_abs_path(&self.root, path);
591
592        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
593
594        let mut query_args = Vec::new();
595
596        if let Some(version) = args.version() {
597            query_args.push(format!(
598                "{}={}",
599                constants::S3_QUERY_VERSION_ID,
600                percent_encode_path(version)
601            ))
602        }
603
604        if !query_args.is_empty() {
605            url.push_str(&format!("?{}", query_args.join("&")));
606        }
607
608        let mut req = Request::delete(&url)
609            // Inject operation to the request.
610            .extension(Operation::Delete)
611            .body(Buffer::new())
612            .map_err(new_request_build_error)?;
613
614        self.sign(&mut req).await?;
615
616        self.send(req).await
617    }
618
619    pub async fn s3_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
620        let from = build_abs_path(&self.root, from);
621        let to = build_abs_path(&self.root, to);
622
623        let source = format!("{}/{}", self.bucket, percent_encode_path(&from));
624        let target = format!("{}/{}", self.endpoint, percent_encode_path(&to));
625
626        let mut req = Request::put(&target);
627
628        // Set SSE headers.
629        req = self.insert_sse_headers(req, true);
630
631        if let Some(v) = &self.server_side_encryption_customer_algorithm {
632            let mut v = v.clone();
633            v.set_sensitive(true);
634
635            req = req.header(
636                HeaderName::from_static(
637                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
638                ),
639                v,
640            )
641        }
642
643        if let Some(v) = &self.server_side_encryption_customer_key {
644            let mut v = v.clone();
645            v.set_sensitive(true);
646
647            req = req.header(
648                HeaderName::from_static(
649                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
650                ),
651                v,
652            )
653        }
654
655        if let Some(v) = &self.server_side_encryption_customer_key_md5 {
656            let mut v = v.clone();
657            v.set_sensitive(true);
658
659            req = req.header(
660                HeaderName::from_static(
661                    constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
662                ),
663                v,
664            )
665        }
666
667        let mut req = req
668            // Inject operation to the request.
669            .extension(Operation::Copy)
670            .header(constants::X_AMZ_COPY_SOURCE, &source)
671            .body(Buffer::new())
672            .map_err(new_request_build_error)?;
673
674        self.sign(&mut req).await?;
675
676        self.send(req).await
677    }
678
679    pub async fn s3_list_objects(
680        &self,
681        path: &str,
682        continuation_token: &str,
683        delimiter: &str,
684        limit: Option<usize>,
685        start_after: Option<String>,
686    ) -> Result<Response<Buffer>> {
687        let p = build_abs_path(&self.root, path);
688
689        let mut url = format!("{}?list-type=2", self.endpoint);
690        if !p.is_empty() {
691            write!(url, "&prefix={}", percent_encode_path(&p))
692                .expect("write into string must succeed");
693        }
694        if !delimiter.is_empty() {
695            write!(url, "&delimiter={delimiter}").expect("write into string must succeed");
696        }
697        if let Some(limit) = limit {
698            write!(url, "&max-keys={limit}").expect("write into string must succeed");
699        }
700        if let Some(start_after) = start_after {
701            write!(url, "&start-after={}", percent_encode_path(&start_after))
702                .expect("write into string must succeed");
703        }
704        if !continuation_token.is_empty() {
705            // AWS S3 could return continuation-token that contains `=`
706            // which could lead `reqsign` parse query wrongly.
707            // URL encode continuation-token before starting signing so that
708            // our signer will not be confused.
709            write!(
710                url,
711                "&continuation-token={}",
712                percent_encode_path(continuation_token)
713            )
714            .expect("write into string must succeed");
715        }
716
717        let mut req = Request::get(&url)
718            // Inject operation to the request.
719            .extension(Operation::List)
720            .body(Buffer::new())
721            .map_err(new_request_build_error)?;
722
723        self.sign(&mut req).await?;
724
725        self.send(req).await
726    }
727
728    pub async fn s3_initiate_multipart_upload(
729        &self,
730        path: &str,
731        args: &OpWrite,
732    ) -> Result<Response<Buffer>> {
733        let p = build_abs_path(&self.root, path);
734
735        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
736
737        let mut req = Request::post(&url);
738
739        if let Some(mime) = args.content_type() {
740            req = req.header(CONTENT_TYPE, mime)
741        }
742
743        if let Some(content_disposition) = args.content_disposition() {
744            req = req.header(CONTENT_DISPOSITION, content_disposition)
745        }
746
747        if let Some(cache_control) = args.cache_control() {
748            req = req.header(CACHE_CONTROL, cache_control)
749        }
750
751        // Set storage class header
752        if let Some(v) = &self.default_storage_class {
753            req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
754        }
755
756        // Set user metadata headers.
757        if let Some(user_metadata) = args.user_metadata() {
758            for (key, value) in user_metadata {
759                req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
760            }
761        }
762
763        // Set SSE headers.
764        let req = self.insert_sse_headers(req, true);
765
766        // Set SSE headers.
767        let req = self.insert_checksum_type_header(req);
768
769        // Inject operation to the request.
770        let req = req.extension(Operation::Write);
771
772        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
773
774        self.sign(&mut req).await?;
775
776        self.send(req).await
777    }
778
779    pub fn s3_upload_part_request(
780        &self,
781        path: &str,
782        upload_id: &str,
783        part_number: usize,
784        size: u64,
785        body: Buffer,
786        checksum: Option<String>,
787    ) -> Result<Request<Buffer>> {
788        let p = build_abs_path(&self.root, path);
789
790        let url = format!(
791            "{}/{}?partNumber={}&uploadId={}",
792            self.endpoint,
793            percent_encode_path(&p),
794            part_number,
795            percent_encode_path(upload_id)
796        );
797
798        let mut req = Request::put(&url);
799
800        req = req.header(CONTENT_LENGTH, size);
801
802        // Set SSE headers.
803        req = self.insert_sse_headers(req, true);
804
805        if let Some(checksum) = checksum {
806            // Set Checksum header.
807            req = self.insert_checksum_header(req, &checksum);
808        }
809
810        // Inject operation to the request.
811        let req = req.extension(Operation::Write);
812
813        // Set body
814        let req = req.body(body).map_err(new_request_build_error)?;
815
816        Ok(req)
817    }
818
819    pub async fn s3_complete_multipart_upload(
820        &self,
821        path: &str,
822        upload_id: &str,
823        parts: Vec<CompleteMultipartUploadRequestPart>,
824    ) -> Result<Response<Buffer>> {
825        let p = build_abs_path(&self.root, path);
826
827        let url = format!(
828            "{}/{}?uploadId={}",
829            self.endpoint,
830            percent_encode_path(&p),
831            percent_encode_path(upload_id)
832        );
833
834        let req = Request::post(&url);
835
836        // Set SSE headers.
837        let req = self.insert_sse_headers(req, true);
838
839        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
840            .map_err(new_xml_deserialize_error)?;
841        // Make sure content length has been set to avoid post with chunked encoding.
842        let req = req.header(CONTENT_LENGTH, content.len());
843        // Set content-type to `application/xml` to avoid mixed with form post.
844        let req = req.header(CONTENT_TYPE, "application/xml");
845
846        // Inject operation to the request.
847        let req = req.extension(Operation::Write);
848
849        let mut req = req
850            .body(Buffer::from(Bytes::from(content)))
851            .map_err(new_request_build_error)?;
852
853        self.sign(&mut req).await?;
854
855        self.send(req).await
856    }
857
858    /// Abort an on-going multipart upload.
859    pub async fn s3_abort_multipart_upload(
860        &self,
861        path: &str,
862        upload_id: &str,
863    ) -> Result<Response<Buffer>> {
864        let p = build_abs_path(&self.root, path);
865
866        let url = format!(
867            "{}/{}?uploadId={}",
868            self.endpoint,
869            percent_encode_path(&p),
870            percent_encode_path(upload_id)
871        );
872
873        let mut req = Request::delete(&url)
874            // Inject operation to the request.
875            .extension(Operation::Write)
876            .body(Buffer::new())
877            .map_err(new_request_build_error)?;
878        self.sign(&mut req).await?;
879        self.send(req).await
880    }
881
882    pub async fn s3_delete_objects(
883        &self,
884        paths: Vec<(String, OpDelete)>,
885    ) -> Result<Response<Buffer>> {
886        let url = format!("{}/?delete", self.endpoint);
887
888        let req = Request::post(&url);
889
890        let content = quick_xml::se::to_string(&DeleteObjectsRequest {
891            object: paths
892                .into_iter()
893                .map(|(path, op)| DeleteObjectsRequestObject {
894                    key: build_abs_path(&self.root, &path),
895                    version_id: op.version().map(|v| v.to_owned()),
896                })
897                .collect(),
898        })
899        .map_err(new_xml_deserialize_error)?;
900
901        // Make sure content length has been set to avoid post with chunked encoding.
902        let req = req.header(CONTENT_LENGTH, content.len());
903        // Set content-type to `application/xml` to avoid mixed with form post.
904        let req = req.header(CONTENT_TYPE, "application/xml");
905        // Set content-md5 as required by API.
906        let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes()));
907
908        // Inject operation to the request.
909        let req = req.extension(Operation::Delete);
910
911        let mut req = req
912            .body(Buffer::from(Bytes::from(content)))
913            .map_err(new_request_build_error)?;
914
915        self.sign(&mut req).await?;
916
917        self.send(req).await
918    }
919
920    pub async fn s3_list_object_versions(
921        &self,
922        prefix: &str,
923        delimiter: &str,
924        limit: Option<usize>,
925        key_marker: &str,
926        version_id_marker: &str,
927    ) -> Result<Response<Buffer>> {
928        let p = build_abs_path(&self.root, prefix);
929
930        let mut url = format!("{}?versions", self.endpoint);
931        if !p.is_empty() {
932            write!(url, "&prefix={}", percent_encode_path(p.as_str()))
933                .expect("write into string must succeed");
934        }
935        if !delimiter.is_empty() {
936            write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
937        }
938
939        if let Some(limit) = limit {
940            write!(url, "&max-keys={}", limit).expect("write into string must succeed");
941        }
942        if !key_marker.is_empty() {
943            write!(url, "&key-marker={}", percent_encode_path(key_marker))
944                .expect("write into string must succeed");
945        }
946        if !version_id_marker.is_empty() {
947            write!(
948                url,
949                "&version-id-marker={}",
950                percent_encode_path(version_id_marker)
951            )
952            .expect("write into string must succeed");
953        }
954
955        let mut req = Request::get(&url)
956            // Inject operation to the request.
957            .extension(Operation::List)
958            .body(Buffer::new())
959            .map_err(new_request_build_error)?;
960
961        self.sign(&mut req).await?;
962
963        self.send(req).await
964    }
965}
966
967/// Result of CreateMultipartUpload
968#[derive(Default, Debug, Deserialize)]
969#[serde(default, rename_all = "PascalCase")]
970pub struct InitiateMultipartUploadResult {
971    pub upload_id: String,
972}
973
974/// Request of CompleteMultipartUploadRequest
975#[derive(Default, Debug, Serialize)]
976#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
977pub struct CompleteMultipartUploadRequest {
978    pub part: Vec<CompleteMultipartUploadRequestPart>,
979}
980
981#[derive(Clone, Default, Debug, Serialize)]
982#[serde(default, rename_all = "PascalCase")]
983pub struct CompleteMultipartUploadRequestPart {
984    #[serde(rename = "PartNumber")]
985    pub part_number: usize,
986    /// # TODO
987    ///
988    /// quick-xml will do escape on `"` which leads to our serialized output is
989    /// not the same as aws s3's example.
990    ///
991    /// Ideally, we could use `serialize_with` to address this (buf failed)
992    ///
993    /// ```ignore
994    /// #[derive(Default, Debug, Serialize)]
995    /// #[serde(default, rename_all = "PascalCase")]
996    /// struct CompleteMultipartUploadRequestPart {
997    ///     #[serde(rename = "PartNumber")]
998    ///     part_number: usize,
999    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
1000    ///     etag: String,
1001    /// }
1002    ///
1003    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
1004    /// where
1005    ///     S: serde::Serializer,
1006    /// {
1007    ///     ser.serialize_str(&String::from_utf8_lossy(
1008    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
1009    ///     ))
1010    /// }
1011    /// ```
1012    ///
1013    /// ref: <https://github.com/tafia/quick-xml/issues/362>
1014    #[serde(rename = "ETag")]
1015    pub etag: String,
1016    #[serde(rename = "ChecksumCRC32C", skip_serializing_if = "Option::is_none")]
1017    pub checksum_crc32c: Option<String>,
1018}
1019
1020/// Output of `CompleteMultipartUpload` operation
1021#[derive(Debug, Default, Deserialize)]
1022#[serde[default, rename_all = "PascalCase"]]
1023pub struct CompleteMultipartUploadResult {
1024    pub bucket: String,
1025    pub key: String,
1026    pub location: String,
1027    #[serde(rename = "ETag")]
1028    pub etag: String,
1029    pub code: String,
1030    pub message: String,
1031    pub request_id: String,
1032}
1033
1034/// Request of DeleteObjects.
1035#[derive(Default, Debug, Serialize)]
1036#[serde(default, rename = "Delete", rename_all = "PascalCase")]
1037pub struct DeleteObjectsRequest {
1038    pub object: Vec<DeleteObjectsRequestObject>,
1039}
1040
1041#[derive(Default, Debug, Serialize)]
1042#[serde(rename_all = "PascalCase")]
1043pub struct DeleteObjectsRequestObject {
1044    pub key: String,
1045    #[serde(skip_serializing_if = "Option::is_none")]
1046    pub version_id: Option<String>,
1047}
1048
1049/// Result of DeleteObjects.
1050#[derive(Default, Debug, Deserialize)]
1051#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")]
1052pub struct DeleteObjectsResult {
1053    pub deleted: Vec<DeleteObjectsResultDeleted>,
1054    pub error: Vec<DeleteObjectsResultError>,
1055}
1056
1057#[derive(Default, Debug, Deserialize)]
1058#[serde(rename_all = "PascalCase")]
1059pub struct DeleteObjectsResultDeleted {
1060    pub key: String,
1061    pub version_id: Option<String>,
1062}
1063
1064#[derive(Default, Debug, Deserialize)]
1065#[serde(default, rename_all = "PascalCase")]
1066pub struct DeleteObjectsResultError {
1067    pub code: String,
1068    pub key: String,
1069    pub message: String,
1070    pub version_id: Option<String>,
1071}
1072
1073/// Output of ListBucket/ListObjects.
1074///
1075/// ## Note
1076///
1077/// Use `Option` in `is_truncated` and `next_continuation_token` to make
1078/// the behavior more clear so that we can be compatible to more s3 services.
1079///
1080/// And enable `serde(default)` so that we can keep going even when some field
1081/// is not exist.
1082#[derive(Default, Debug, Deserialize)]
1083#[serde(default, rename_all = "PascalCase")]
1084pub struct ListObjectsOutput {
1085    pub is_truncated: Option<bool>,
1086    pub next_continuation_token: Option<String>,
1087    pub common_prefixes: Vec<OutputCommonPrefix>,
1088    pub contents: Vec<ListObjectsOutputContent>,
1089}
1090
1091#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1092#[serde(rename_all = "PascalCase")]
1093pub struct ListObjectsOutputContent {
1094    pub key: String,
1095    pub size: u64,
1096    pub last_modified: String,
1097    #[serde(rename = "ETag")]
1098    pub etag: Option<String>,
1099}
1100
1101#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1102#[serde(rename_all = "PascalCase")]
1103pub struct OutputCommonPrefix {
1104    pub prefix: String,
1105}
1106
1107/// Output of ListObjectVersions
1108#[derive(Default, Debug, Deserialize)]
1109#[serde(default, rename_all = "PascalCase")]
1110pub struct ListObjectVersionsOutput {
1111    pub is_truncated: Option<bool>,
1112    pub next_key_marker: Option<String>,
1113    pub next_version_id_marker: Option<String>,
1114    pub common_prefixes: Vec<OutputCommonPrefix>,
1115    pub version: Vec<ListObjectVersionsOutputVersion>,
1116    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
1117}
1118
1119#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1120#[serde(rename_all = "PascalCase")]
1121pub struct ListObjectVersionsOutputVersion {
1122    pub key: String,
1123    pub version_id: String,
1124    pub is_latest: bool,
1125    pub size: u64,
1126    pub last_modified: String,
1127    #[serde(rename = "ETag")]
1128    pub etag: Option<String>,
1129}
1130
1131#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
1132#[serde(rename_all = "PascalCase")]
1133pub struct ListObjectVersionsOutputDeleteMarker {
1134    pub key: String,
1135    pub version_id: String,
1136    pub is_latest: bool,
1137    pub last_modified: String,
1138}
1139
1140pub enum ChecksumAlgorithm {
1141    Crc32c,
1142}
1143impl ChecksumAlgorithm {
1144    pub fn to_header_name(&self) -> HeaderName {
1145        match self {
1146            Self::Crc32c => HeaderName::from_static("x-amz-checksum-crc32c"),
1147        }
1148    }
1149}
1150impl Display for ChecksumAlgorithm {
1151    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1152        write!(
1153            f,
1154            "{}",
1155            match self {
1156                Self::Crc32c => "CRC32C",
1157            }
1158        )
1159    }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use bytes::Buf;
1165    use bytes::Bytes;
1166
1167    use super::*;
1168
1169    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples
1170    #[test]
1171    fn test_deserialize_initiate_multipart_upload_result() {
1172        let bs = Bytes::from(
1173            r#"<?xml version="1.0" encoding="UTF-8"?>
1174            <InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1175              <Bucket>example-bucket</Bucket>
1176              <Key>example-object</Key>
1177              <UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
1178            </InitiateMultipartUploadResult>"#,
1179        );
1180
1181        let out: InitiateMultipartUploadResult =
1182            quick_xml::de::from_reader(bs.reader()).expect("must success");
1183
1184        assert_eq!(
1185            out.upload_id,
1186            "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"
1187        )
1188    }
1189
1190    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples
1191    #[test]
1192    fn test_serialize_complete_multipart_upload_request() {
1193        let req = CompleteMultipartUploadRequest {
1194            part: vec![
1195                CompleteMultipartUploadRequestPart {
1196                    part_number: 1,
1197                    etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(),
1198                    ..Default::default()
1199                },
1200                CompleteMultipartUploadRequestPart {
1201                    part_number: 2,
1202                    etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(),
1203                    ..Default::default()
1204                },
1205                CompleteMultipartUploadRequestPart {
1206                    part_number: 3,
1207                    etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(),
1208                    ..Default::default()
1209                },
1210            ],
1211        };
1212
1213        let actual = quick_xml::se::to_string(&req).expect("must succeed");
1214
1215        pretty_assertions::assert_eq!(
1216            actual,
1217            r#"<CompleteMultipartUpload>
1218             <Part>
1219                <PartNumber>1</PartNumber>
1220               <ETag>"a54357aff0632cce46d942af68356b38"</ETag>
1221             </Part>
1222             <Part>
1223                <PartNumber>2</PartNumber>
1224               <ETag>"0c78aef83f66abc1fa1e8477f296d394"</ETag>
1225             </Part>
1226             <Part>
1227               <PartNumber>3</PartNumber>
1228               <ETag>"acbd18db4cc2f85cedef654fccc4a4d8"</ETag>
1229             </Part>
1230            </CompleteMultipartUpload>"#
1231                // Cleanup space and new line
1232                .replace([' ', '\n'], "")
1233        )
1234    }
1235
1236    /// this example is from: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
1237    #[test]
1238    fn test_deserialize_complete_multipart_upload_result() {
1239        let bs = Bytes::from(
1240            r#"<?xml version="1.0" encoding="UTF-8"?>
1241            <CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1242             <Location>http://Example-Bucket.s3.region.amazonaws.com/Example-Object</Location>
1243             <Bucket>Example-Bucket</Bucket>
1244             <Key>Example-Object</Key>
1245             <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
1246            </CompleteMultipartUploadResult>"#,
1247        );
1248
1249        let out: CompleteMultipartUploadResult =
1250            quick_xml::de::from_reader(bs.reader()).expect("must success");
1251
1252        assert_eq!(out.bucket, "Example-Bucket");
1253        assert_eq!(out.key, "Example-Object");
1254        assert_eq!(
1255            out.location,
1256            "http://Example-Bucket.s3.region.amazonaws.com/Example-Object"
1257        );
1258        assert_eq!(out.etag, "\"3858f62230ac3c915f300c664312c11f-9\"");
1259    }
1260
1261    #[test]
1262    fn test_deserialize_complete_multipart_upload_result_when_return_error() {
1263        let bs = Bytes::from(
1264            r#"<?xml version="1.0" encoding="UTF-8"?>
1265
1266                <Error>
1267                <Code>InternalError</Code>
1268                <Message>We encountered an internal error. Please try again.</Message>
1269                <RequestId>656c76696e6727732072657175657374</RequestId>
1270                <HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
1271                </Error>"#,
1272        );
1273
1274        let out: CompleteMultipartUploadResult =
1275            quick_xml::de::from_reader(bs.reader()).expect("must success");
1276
1277        assert_eq!(out.code, "InternalError");
1278        assert_eq!(
1279            out.message,
1280            "We encountered an internal error. Please try again."
1281        );
1282        assert_eq!(out.request_id, "656c76696e6727732072657175657374");
1283    }
1284
1285    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
1286    #[test]
1287    fn test_serialize_delete_objects_request() {
1288        let req = DeleteObjectsRequest {
1289            object: vec![
1290                DeleteObjectsRequestObject {
1291                    key: "sample1.txt".to_string(),
1292                    version_id: None,
1293                },
1294                DeleteObjectsRequestObject {
1295                    key: "sample2.txt".to_string(),
1296                    version_id: Some("11111".to_owned()),
1297                },
1298            ],
1299        };
1300
1301        let actual = quick_xml::se::to_string(&req).expect("must succeed");
1302
1303        pretty_assertions::assert_eq!(
1304            actual,
1305            r#"<Delete>
1306             <Object>
1307             <Key>sample1.txt</Key>
1308             </Object>
1309             <Object>
1310               <Key>sample2.txt</Key>
1311               <VersionId>11111</VersionId>
1312             </Object>
1313             </Delete>"#
1314                // Cleanup space and new line
1315                .replace([' ', '\n'], "")
1316        )
1317    }
1318
1319    /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples
1320    #[test]
1321    fn test_deserialize_delete_objects_result() {
1322        let bs = Bytes::from(
1323            r#"<?xml version="1.0" encoding="UTF-8"?>
1324            <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1325             <Deleted>
1326               <Key>sample1.txt</Key>
1327             </Deleted>
1328             <Error>
1329              <Key>sample2.txt</Key>
1330              <Code>AccessDenied</Code>
1331              <Message>Access Denied</Message>
1332             </Error>
1333            </DeleteResult>"#,
1334        );
1335
1336        let out: DeleteObjectsResult =
1337            quick_xml::de::from_reader(bs.reader()).expect("must success");
1338
1339        assert_eq!(out.deleted.len(), 1);
1340        assert_eq!(out.deleted[0].key, "sample1.txt");
1341        assert_eq!(out.error.len(), 1);
1342        assert_eq!(out.error[0].key, "sample2.txt");
1343        assert_eq!(out.error[0].code, "AccessDenied");
1344        assert_eq!(out.error[0].message, "Access Denied");
1345    }
1346
1347    #[test]
1348    fn test_deserialize_delete_objects_with_version_id() {
1349        let bs = Bytes::from(
1350            r#"<?xml version="1.0" encoding="UTF-8"?>
1351                  <DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1352                    <Deleted>
1353                      <Key>SampleDocument.txt</Key>
1354                      <VersionId>OYcLXagmS.WaD..oyH4KRguB95_YhLs7</VersionId>
1355                    </Deleted>
1356                  </DeleteResult>"#,
1357        );
1358
1359        let out: DeleteObjectsResult =
1360            quick_xml::de::from_reader(bs.reader()).expect("must success");
1361
1362        assert_eq!(out.deleted.len(), 1);
1363        assert_eq!(out.deleted[0].key, "SampleDocument.txt");
1364        assert_eq!(
1365            out.deleted[0].version_id,
1366            Some("OYcLXagmS.WaD..oyH4KRguB95_YhLs7".to_owned())
1367        );
1368        assert_eq!(out.error.len(), 0);
1369    }
1370
1371    #[test]
1372    fn test_parse_list_output() {
1373        let bs = bytes::Bytes::from(
1374            r#"<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1375  <Name>example-bucket</Name>
1376  <Prefix>photos/2006/</Prefix>
1377  <KeyCount>3</KeyCount>
1378  <MaxKeys>1000</MaxKeys>
1379  <Delimiter>/</Delimiter>
1380  <IsTruncated>false</IsTruncated>
1381  <Contents>
1382    <Key>photos/2006</Key>
1383    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
1384    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
1385    <Size>56</Size>
1386    <StorageClass>STANDARD</StorageClass>
1387  </Contents>
1388  <Contents>
1389    <Key>photos/2007</Key>
1390    <LastModified>2016-04-30T23:51:29.000Z</LastModified>
1391    <ETag>"d41d8cd98f00b204e9800998ecf8427e"</ETag>
1392    <Size>100</Size>
1393    <StorageClass>STANDARD</StorageClass>
1394  </Contents>
1395  <Contents>
1396    <Key>photos/2008</Key>
1397    <LastModified>2016-05-30T23:51:29.000Z</LastModified>
1398    <Size>42</Size>
1399  </Contents>
1400
1401  <CommonPrefixes>
1402    <Prefix>photos/2006/February/</Prefix>
1403  </CommonPrefixes>
1404  <CommonPrefixes>
1405    <Prefix>photos/2006/January/</Prefix>
1406  </CommonPrefixes>
1407</ListBucketResult>"#,
1408        );
1409
1410        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
1411
1412        assert!(!out.is_truncated.unwrap());
1413        assert!(out.next_continuation_token.is_none());
1414        assert_eq!(
1415            out.common_prefixes
1416                .iter()
1417                .map(|v| v.prefix.clone())
1418                .collect::<Vec<String>>(),
1419            vec!["photos/2006/February/", "photos/2006/January/"]
1420        );
1421        assert_eq!(
1422            out.contents,
1423            vec![
1424                ListObjectsOutputContent {
1425                    key: "photos/2006".to_string(),
1426                    size: 56,
1427                    etag: Some("\"d41d8cd98f00b204e9800998ecf8427e\"".to_string()),
1428                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
1429                },
1430                ListObjectsOutputContent {
1431                    key: "photos/2007".to_string(),
1432                    size: 100,
1433                    last_modified: "2016-04-30T23:51:29.000Z".to_string(),
1434                    etag: Some("\"d41d8cd98f00b204e9800998ecf8427e\"".to_string()),
1435                },
1436                ListObjectsOutputContent {
1437                    key: "photos/2008".to_string(),
1438                    size: 42,
1439                    last_modified: "2016-05-30T23:51:29.000Z".to_string(),
1440                    etag: None,
1441                },
1442            ]
1443        )
1444    }
1445
1446    #[test]
1447    fn test_parse_list_object_versions() {
1448        let bs = bytes::Bytes::from(
1449            r#"<?xml version="1.0" encoding="UTF-8"?>
1450                <ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1451                <Name>mtp-versioning-fresh</Name>
1452                <Prefix/>
1453                <KeyMarker>key3</KeyMarker>
1454                <VersionIdMarker>null</VersionIdMarker>
1455                <NextKeyMarker>key3</NextKeyMarker>
1456                <NextVersionIdMarker>d-d309mfjFrUmoQ0DBsVqmcMV15OI.</NextVersionIdMarker>
1457                <MaxKeys>3</MaxKeys>
1458                <IsTruncated>true</IsTruncated>
1459                <Version>
1460                    <Key>key3</Key>
1461                    <VersionId>8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.</VersionId>
1462                    <IsLatest>true</IsLatest>
1463                    <LastModified>2009-12-09T00:18:23.000Z</LastModified>
1464                    <ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
1465                    <Size>217</Size>
1466                    <Owner>
1467                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1468                    </Owner>
1469                    <StorageClass>STANDARD</StorageClass>
1470                </Version>
1471                <Version>
1472                    <Key>key3</Key>
1473                    <VersionId>d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.</VersionId>
1474                    <IsLatest>false</IsLatest>
1475                    <LastModified>2009-12-09T00:18:08.000Z</LastModified>
1476                    <ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
1477                    <Size>217</Size>
1478                    <Owner>
1479                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1480                    </Owner>
1481                    <StorageClass>STANDARD</StorageClass>
1482                </Version>
1483                <CommonPrefixes>
1484                    <Prefix>photos/</Prefix>
1485                </CommonPrefixes>
1486                <CommonPrefixes>
1487                    <Prefix>videos/</Prefix>
1488                </CommonPrefixes>
1489                 <DeleteMarker>
1490                    <Key>my-third-image.jpg</Key>
1491                    <VersionId>03jpff543dhffds434rfdsFDN943fdsFkdmqnh892</VersionId>
1492                    <IsLatest>true</IsLatest>
1493                    <LastModified>2009-10-15T17:50:30.000Z</LastModified>
1494                    <Owner>
1495                        <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
1496                        <DisplayName>mtd@amazon.com</DisplayName>
1497                    </Owner>
1498                </DeleteMarker>
1499                </ListVersionsResult>"#,
1500        );
1501
1502        let output: ListObjectVersionsOutput =
1503            quick_xml::de::from_reader(bs.reader()).expect("must succeed");
1504
1505        assert!(output.is_truncated.unwrap());
1506        assert_eq!(output.next_key_marker, Some("key3".to_owned()));
1507        assert_eq!(
1508            output.next_version_id_marker,
1509            Some("d-d309mfjFrUmoQ0DBsVqmcMV15OI.".to_owned())
1510        );
1511        assert_eq!(
1512            output.common_prefixes,
1513            vec![
1514                OutputCommonPrefix {
1515                    prefix: "photos/".to_owned()
1516                },
1517                OutputCommonPrefix {
1518                    prefix: "videos/".to_owned()
1519                }
1520            ]
1521        );
1522
1523        assert_eq!(
1524            output.version,
1525            vec![
1526                ListObjectVersionsOutputVersion {
1527                    key: "key3".to_owned(),
1528                    version_id: "8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.".to_owned(),
1529                    is_latest: true,
1530                    size: 217,
1531                    last_modified: "2009-12-09T00:18:23.000Z".to_owned(),
1532                    etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
1533                },
1534                ListObjectVersionsOutputVersion {
1535                    key: "key3".to_owned(),
1536                    version_id: "d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.".to_owned(),
1537                    is_latest: false,
1538                    size: 217,
1539                    last_modified: "2009-12-09T00:18:08.000Z".to_owned(),
1540                    etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
1541                }
1542            ]
1543        );
1544
1545        assert_eq!(
1546            output.delete_marker,
1547            vec![ListObjectVersionsOutputDeleteMarker {
1548                key: "my-third-image.jpg".to_owned(),
1549                version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(),
1550                is_latest: true,
1551                last_modified: "2009-10-15T17:50:30.000Z".to_owned(),
1552            },]
1553        );
1554    }
1555}