opendal/services/cos/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::HeaderMap;
22use http::HeaderValue;
23use http::StatusCode;
24
25use super::core::*;
26use super::error::parse_error;
27use crate::raw::*;
28use crate::*;
29
30pub type CosWriters = TwoWays<oio::MultipartWriter<CosWriter>, oio::AppendWriter<CosWriter>>;
31
32pub struct CosWriter {
33    core: Arc<CosCore>,
34
35    op: OpWrite,
36    path: String,
37}
38
39impl CosWriter {
40    pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> Self {
41        CosWriter {
42            core,
43            path: path.to_string(),
44            op,
45        }
46    }
47
48    fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
49        let mut meta = Metadata::default();
50        if let Some(etag) = parse_etag(headers)? {
51            meta.set_etag(etag);
52        }
53        if let Some(md5) = parse_content_md5(headers)? {
54            meta.set_content_md5(md5);
55        }
56        if let Some(version) = parse_header_to_str(headers, constants::X_COS_VERSION_ID)? {
57            if version != "null" {
58                meta.set_version(version);
59            }
60        }
61
62        Ok(meta)
63    }
64}
65
66impl oio::MultipartWrite for CosWriter {
67    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
68        let mut req = self
69            .core
70            .cos_put_object_request(&self.path, Some(size), &self.op, body)?;
71
72        self.core.sign(&mut req).await?;
73
74        let resp = self.core.send(req).await?;
75
76        let meta = Self::parse_metadata(resp.headers())?;
77
78        let status = resp.status();
79
80        match status {
81            StatusCode::CREATED | StatusCode::OK => Ok(meta),
82            _ => Err(parse_error(resp)),
83        }
84    }
85
86    async fn initiate_part(&self) -> Result<String> {
87        let resp = self
88            .core
89            .cos_initiate_multipart_upload(&self.path, &self.op)
90            .await?;
91
92        let status = resp.status();
93
94        match status {
95            StatusCode::OK => {
96                let bs = resp.into_body();
97
98                let result: InitiateMultipartUploadResult =
99                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
100                        .map_err(new_xml_deserialize_error)?;
101
102                Ok(result.upload_id)
103            }
104            _ => Err(parse_error(resp)),
105        }
106    }
107
108    async fn write_part(
109        &self,
110        upload_id: &str,
111        part_number: usize,
112        size: u64,
113        body: Buffer,
114    ) -> Result<oio::MultipartPart> {
115        // COS requires part number must between [1..=10000]
116        let part_number = part_number + 1;
117
118        let resp = self
119            .core
120            .cos_upload_part_request(&self.path, upload_id, part_number, size, body)
121            .await?;
122
123        let status = resp.status();
124
125        match status {
126            StatusCode::OK => {
127                let etag = parse_etag(resp.headers())?
128                    .ok_or_else(|| {
129                        Error::new(
130                            ErrorKind::Unexpected,
131                            "ETag not present in returning response",
132                        )
133                    })?
134                    .to_string();
135
136                Ok(oio::MultipartPart {
137                    part_number,
138                    etag,
139                    checksum: None,
140                })
141            }
142            _ => Err(parse_error(resp)),
143        }
144    }
145
146    async fn complete_part(
147        &self,
148        upload_id: &str,
149        parts: &[oio::MultipartPart],
150    ) -> Result<Metadata> {
151        let parts = parts
152            .iter()
153            .map(|p| CompleteMultipartUploadRequestPart {
154                part_number: p.part_number,
155                etag: p.etag.clone(),
156            })
157            .collect();
158
159        let mut resp = self
160            .core
161            .cos_complete_multipart_upload(&self.path, upload_id, parts)
162            .await?;
163
164        let mut meta = Self::parse_metadata(resp.headers())?;
165
166        let result: CompleteMultipartUploadResult =
167            quick_xml::de::from_reader(resp.body_mut().reader())
168                .map_err(new_xml_deserialize_error)?;
169        meta.set_etag(&result.etag);
170
171        let status = resp.status();
172
173        match status {
174            StatusCode::OK => Ok(meta),
175            _ => Err(parse_error(resp)),
176        }
177    }
178
179    async fn abort_part(&self, upload_id: &str) -> Result<()> {
180        let resp = self
181            .core
182            .cos_abort_multipart_upload(&self.path, upload_id)
183            .await?;
184        match resp.status() {
185            // cos returns code 204 if abort succeeds.
186            // Reference: https://www.tencentcloud.com/document/product/436/7740
187            StatusCode::NO_CONTENT => Ok(()),
188            _ => Err(parse_error(resp)),
189        }
190    }
191}
192
193impl oio::AppendWrite for CosWriter {
194    async fn offset(&self) -> Result<u64> {
195        let resp = self
196            .core
197            .cos_head_object(&self.path, &OpStat::default())
198            .await?;
199
200        let status = resp.status();
201        match status {
202            StatusCode::OK => {
203                let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
204                    Error::new(
205                        ErrorKind::Unexpected,
206                        "Content-Length not present in returning response",
207                    )
208                })?;
209                Ok(content_length)
210            }
211            StatusCode::NOT_FOUND => Ok(0),
212            _ => Err(parse_error(resp)),
213        }
214    }
215
216    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
217        let mut req = self
218            .core
219            .cos_append_object_request(&self.path, offset, size, &self.op, body)?;
220
221        self.core.sign(&mut req).await?;
222
223        let resp = self.core.send(req).await?;
224
225        let meta = Self::parse_metadata(resp.headers())?;
226
227        let status = resp.status();
228
229        match status {
230            StatusCode::OK => Ok(meta),
231            _ => Err(parse_error(resp)),
232        }
233    }
234}