opendal/services/s3/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use constants::X_AMZ_OBJECT_SIZE;
22use constants::X_AMZ_VERSION_ID;
23use http::StatusCode;
24
25use super::core::*;
26use super::error::from_s3_error;
27use super::error::parse_error;
28use super::error::S3Error;
29use crate::raw::*;
30use crate::*;
31
32pub type S3Writers = TwoWays<oio::MultipartWriter<S3Writer>, oio::AppendWriter<S3Writer>>;
33
34pub struct S3Writer {
35    core: Arc<S3Core>,
36
37    op: OpWrite,
38    path: String,
39}
40
41impl S3Writer {
42    pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
43        S3Writer {
44            core,
45            path: path.to_string(),
46            op,
47        }
48    }
49
50    fn parse_header_into_meta(path: &str, headers: &http::HeaderMap) -> Result<Metadata> {
51        let mut meta = Metadata::new(EntryMode::from_path(path));
52        if let Some(etag) = parse_etag(headers)? {
53            meta.set_etag(etag);
54        }
55        if let Some(version) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
56            meta.set_version(version);
57        }
58        if let Some(size) = parse_header_to_str(headers, X_AMZ_OBJECT_SIZE)? {
59            if let Ok(value) = size.parse() {
60                meta.set_content_length(value);
61            }
62        }
63        Ok(meta)
64    }
65}
66
67impl oio::MultipartWrite for S3Writer {
68    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
69        let mut req = self
70            .core
71            .s3_put_object_request(&self.path, Some(size), &self.op, body)?;
72
73        self.core.sign(&mut req).await?;
74
75        let resp = self.core.send(req).await?;
76
77        let status = resp.status();
78
79        let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
80
81        match status {
82            StatusCode::CREATED | StatusCode::OK => Ok(meta),
83            _ => Err(parse_error(resp)),
84        }
85    }
86
87    async fn initiate_part(&self) -> Result<String> {
88        let resp = self
89            .core
90            .s3_initiate_multipart_upload(&self.path, &self.op)
91            .await?;
92
93        let status = resp.status();
94
95        match status {
96            StatusCode::OK => {
97                let bs = resp.into_body();
98
99                let result: InitiateMultipartUploadResult =
100                    quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
101
102                Ok(result.upload_id)
103            }
104            _ => Err(parse_error(resp)),
105        }
106    }
107
108    async fn write_part(
109        &self,
110        upload_id: &str,
111        part_number: usize,
112        size: u64,
113        body: Buffer,
114    ) -> Result<oio::MultipartPart> {
115        // AWS S3 requires part number must between [1..=10000]
116        let part_number = part_number + 1;
117
118        let checksum = self.core.calculate_checksum(&body);
119
120        let mut req = self.core.s3_upload_part_request(
121            &self.path,
122            upload_id,
123            part_number,
124            size,
125            body,
126            checksum.clone(),
127        )?;
128
129        self.core.sign(&mut req).await?;
130
131        let resp = self.core.send(req).await?;
132
133        let status = resp.status();
134
135        match status {
136            StatusCode::OK => {
137                let etag = parse_etag(resp.headers())?
138                    .ok_or_else(|| {
139                        Error::new(
140                            ErrorKind::Unexpected,
141                            "ETag not present in returning response",
142                        )
143                    })?
144                    .to_string();
145
146                Ok(oio::MultipartPart {
147                    part_number,
148                    etag,
149                    checksum,
150                })
151            }
152            _ => Err(parse_error(resp)),
153        }
154    }
155
156    async fn complete_part(
157        &self,
158        upload_id: &str,
159        parts: &[oio::MultipartPart],
160    ) -> Result<Metadata> {
161        let parts = parts
162            .iter()
163            .map(|p| match &self.core.checksum_algorithm {
164                None => CompleteMultipartUploadRequestPart {
165                    part_number: p.part_number,
166                    etag: p.etag.clone(),
167                    ..Default::default()
168                },
169                Some(checksum_algorithm) => match checksum_algorithm {
170                    ChecksumAlgorithm::Crc32c => CompleteMultipartUploadRequestPart {
171                        part_number: p.part_number,
172                        etag: p.etag.clone(),
173                        checksum_crc32c: p.checksum.clone(),
174                    },
175                },
176            })
177            .collect();
178
179        let resp = self
180            .core
181            .s3_complete_multipart_upload(&self.path, upload_id, parts)
182            .await?;
183
184        let status = resp.status();
185
186        let mut meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
187
188        match status {
189            StatusCode::OK => {
190                // still check if there is any error because S3 might return error for status code 200
191                // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Example_4
192                let (parts, body) = resp.into_parts();
193
194                let ret: CompleteMultipartUploadResult =
195                    quick_xml::de::from_reader(body.reader()).map_err(new_xml_deserialize_error)?;
196                if !ret.code.is_empty() {
197                    return Err(from_s3_error(
198                        S3Error {
199                            code: ret.code,
200                            message: ret.message,
201                            resource: "".to_string(),
202                            request_id: ret.request_id,
203                        },
204                        parts,
205                    ));
206                }
207                meta.set_etag(&ret.etag);
208
209                Ok(meta)
210            }
211            _ => Err(parse_error(resp)),
212        }
213    }
214
215    async fn abort_part(&self, upload_id: &str) -> Result<()> {
216        let resp = self
217            .core
218            .s3_abort_multipart_upload(&self.path, upload_id)
219            .await?;
220        match resp.status() {
221            // s3 returns code 204 if abort succeeds.
222            StatusCode::NO_CONTENT => Ok(()),
223            _ => Err(parse_error(resp)),
224        }
225    }
226}
227
228impl oio::AppendWrite for S3Writer {
229    async fn offset(&self) -> Result<u64> {
230        let resp = self
231            .core
232            .s3_head_object(&self.path, OpStat::default())
233            .await?;
234
235        let status = resp.status();
236
237        match status {
238            StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()),
239            StatusCode::NOT_FOUND => Ok(0),
240            _ => Err(parse_error(resp)),
241        }
242    }
243
244    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
245        let mut req = self
246            .core
247            .s3_append_object_request(&self.path, offset, size, &self.op, body)?;
248
249        self.core.sign(&mut req).await?;
250
251        let resp = self.core.send(req).await?;
252
253        let status = resp.status();
254
255        let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
256
257        match status {
258            StatusCode::CREATED | StatusCode::OK => Ok(meta),
259            _ => Err(parse_error(resp)),
260        }
261    }
262}