opendal/services/s3/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::core::*;
21use super::error::from_s3_error;
22use super::error::parse_error;
23use super::error::S3Error;
24use crate::raw::*;
25use crate::*;
26use bytes::Buf;
27use constants::{X_AMZ_OBJECT_SIZE, X_AMZ_VERSION_ID};
28use http::StatusCode;
29
30pub type S3Writers = TwoWays<oio::MultipartWriter<S3Writer>, oio::AppendWriter<S3Writer>>;
31
32pub struct S3Writer {
33    core: Arc<S3Core>,
34
35    op: OpWrite,
36    path: String,
37}
38
39impl S3Writer {
40    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
41        S3Writer {
42            core,
43            path: path.to_string(),
44            op,
45        }
46    }
47
48    fn parse_header_into_meta(path: &str, headers: &http::HeaderMap) -> Result<Metadata> {
49        let mut meta = Metadata::new(EntryMode::from_path(path));
50        if let Some(etag) = parse_etag(headers)? {
51            meta.set_etag(etag);
52        }
53        if let Some(version) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
54            meta.set_version(version);
55        }
56        if let Some(size) = parse_header_to_str(headers, X_AMZ_OBJECT_SIZE)? {
57            if let Ok(value) = size.parse() {
58                meta.set_content_length(value);
59            }
60        }
61        Ok(meta)
62    }
63}
64
65impl oio::MultipartWrite for S3Writer {
66    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
67        let mut req = self
68            .core
69            .s3_put_object_request(&self.path, Some(size), &self.op, body)?;
70
71        self.core.sign(&mut req).await?;
72
73        let resp = self.core.send(req).await?;
74
75        let status = resp.status();
76
77        let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
78
79        match status {
80            StatusCode::CREATED | StatusCode::OK => Ok(meta),
81            _ => Err(parse_error(resp)),
82        }
83    }
84
85    async fn initiate_part(&self) -> Result<String> {
86        let resp = self
87            .core
88            .s3_initiate_multipart_upload(&self.path, &self.op)
89            .await?;
90
91        let status = resp.status();
92
93        match status {
94            StatusCode::OK => {
95                let bs = resp.into_body();
96
97                let result: InitiateMultipartUploadResult =
98                    quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
99
100                Ok(result.upload_id)
101            }
102            _ => Err(parse_error(resp)),
103        }
104    }
105
106    async fn write_part(
107        &self,
108        upload_id: &str,
109        part_number: usize,
110        size: u64,
111        body: Buffer,
112    ) -> Result<oio::MultipartPart> {
113        // AWS S3 requires part number must between [1..=10000]
114        let part_number = part_number + 1;
115
116        let checksum = self.core.calculate_checksum(&body);
117
118        let mut req = self.core.s3_upload_part_request(
119            &self.path,
120            upload_id,
121            part_number,
122            size,
123            body,
124            checksum.clone(),
125        )?;
126
127        self.core.sign(&mut req).await?;
128
129        let resp = self.core.send(req).await?;
130
131        let status = resp.status();
132
133        match status {
134            StatusCode::OK => {
135                let etag = parse_etag(resp.headers())?
136                    .ok_or_else(|| {
137                        Error::new(
138                            ErrorKind::Unexpected,
139                            "ETag not present in returning response",
140                        )
141                    })?
142                    .to_string();
143
144                Ok(oio::MultipartPart {
145                    part_number,
146                    etag,
147                    checksum,
148                })
149            }
150            _ => Err(parse_error(resp)),
151        }
152    }
153
154    async fn complete_part(
155        &self,
156        upload_id: &str,
157        parts: &[oio::MultipartPart],
158    ) -> Result<Metadata> {
159        let parts = parts
160            .iter()
161            .map(|p| match &self.core.checksum_algorithm {
162                None => CompleteMultipartUploadRequestPart {
163                    part_number: p.part_number,
164                    etag: p.etag.clone(),
165                    ..Default::default()
166                },
167                Some(checksum_algorithm) => match checksum_algorithm {
168                    ChecksumAlgorithm::Crc32c => CompleteMultipartUploadRequestPart {
169                        part_number: p.part_number,
170                        etag: p.etag.clone(),
171                        checksum_crc32c: p.checksum.clone(),
172                    },
173                },
174            })
175            .collect();
176
177        let resp = self
178            .core
179            .s3_complete_multipart_upload(&self.path, upload_id, parts)
180            .await?;
181
182        let status = resp.status();
183
184        let mut meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
185
186        match status {
187            StatusCode::OK => {
188                // still check if there is any error because S3 might return error for status code 200
189                // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Example_4
190                let (parts, body) = resp.into_parts();
191
192                let ret: CompleteMultipartUploadResult =
193                    quick_xml::de::from_reader(body.reader()).map_err(new_xml_deserialize_error)?;
194                if !ret.code.is_empty() {
195                    return Err(from_s3_error(
196                        S3Error {
197                            code: ret.code,
198                            message: ret.message,
199                            resource: "".to_string(),
200                            request_id: ret.request_id,
201                        },
202                        parts,
203                    ));
204                }
205                meta.set_etag(&ret.etag);
206
207                Ok(meta)
208            }
209            _ => Err(parse_error(resp)),
210        }
211    }
212
213    async fn abort_part(&self, upload_id: &str) -> Result<()> {
214        let resp = self
215            .core
216            .s3_abort_multipart_upload(&self.path, upload_id)
217            .await?;
218        match resp.status() {
219            // s3 returns code 204 if abort succeeds.
220            StatusCode::NO_CONTENT => Ok(()),
221            _ => Err(parse_error(resp)),
222        }
223    }
224}
225
226impl oio::AppendWrite for S3Writer {
227    async fn offset(&self) -> Result<u64> {
228        let resp = self
229            .core
230            .s3_head_object(&self.path, OpStat::default())
231            .await?;
232
233        let status = resp.status();
234
235        match status {
236            StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()),
237            StatusCode::NOT_FOUND => Ok(0),
238            _ => Err(parse_error(resp)),
239        }
240    }
241
242    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
243        let mut req = self
244            .core
245            .s3_append_object_request(&self.path, offset, size, &self.op, body)?;
246
247        self.core.sign(&mut req).await?;
248
249        let resp = self.core.send(req).await?;
250
251        let status = resp.status();
252
253        let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
254
255        match status {
256            StatusCode::CREATED | StatusCode::OK => Ok(meta),
257            _ => Err(parse_error(resp)),
258        }
259    }
260}