opendal/services/cos/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::{HeaderMap, HeaderValue, StatusCode};
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub type CosWriters = TwoWays<oio::MultipartWriter<CosWriter>, oio::AppendWriter<CosWriter>>;
29
30pub struct CosWriter {
31    core: Arc<CosCore>,
32
33    op: OpWrite,
34    path: String,
35}
36
37impl CosWriter {
38    pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> Self {
39        CosWriter {
40            core,
41            path: path.to_string(),
42            op,
43        }
44    }
45
46    fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
47        let mut meta = Metadata::default();
48        if let Some(etag) = parse_etag(headers)? {
49            meta.set_etag(etag);
50        }
51        if let Some(md5) = parse_content_md5(headers)? {
52            meta.set_content_md5(md5);
53        }
54        if let Some(version) = parse_header_to_str(headers, constants::X_COS_VERSION_ID)? {
55            if version != "null" {
56                meta.set_version(version);
57            }
58        }
59
60        Ok(meta)
61    }
62}
63
64impl oio::MultipartWrite for CosWriter {
65    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
66        let mut req = self
67            .core
68            .cos_put_object_request(&self.path, Some(size), &self.op, body)?;
69
70        self.core.sign(&mut req).await?;
71
72        let resp = self.core.send(req).await?;
73
74        let meta = Self::parse_metadata(resp.headers())?;
75
76        let status = resp.status();
77
78        match status {
79            StatusCode::CREATED | StatusCode::OK => Ok(meta),
80            _ => Err(parse_error(resp)),
81        }
82    }
83
84    async fn initiate_part(&self) -> Result<String> {
85        let resp = self
86            .core
87            .cos_initiate_multipart_upload(&self.path, &self.op)
88            .await?;
89
90        let status = resp.status();
91
92        match status {
93            StatusCode::OK => {
94                let bs = resp.into_body();
95
96                let result: InitiateMultipartUploadResult =
97                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
98                        .map_err(new_xml_deserialize_error)?;
99
100                Ok(result.upload_id)
101            }
102            _ => Err(parse_error(resp)),
103        }
104    }
105
106    async fn write_part(
107        &self,
108        upload_id: &str,
109        part_number: usize,
110        size: u64,
111        body: Buffer,
112    ) -> Result<oio::MultipartPart> {
113        // COS requires part number must between [1..=10000]
114        let part_number = part_number + 1;
115
116        let resp = self
117            .core
118            .cos_upload_part_request(&self.path, upload_id, part_number, size, body)
119            .await?;
120
121        let status = resp.status();
122
123        match status {
124            StatusCode::OK => {
125                let etag = parse_etag(resp.headers())?
126                    .ok_or_else(|| {
127                        Error::new(
128                            ErrorKind::Unexpected,
129                            "ETag not present in returning response",
130                        )
131                    })?
132                    .to_string();
133
134                Ok(oio::MultipartPart {
135                    part_number,
136                    etag,
137                    checksum: None,
138                })
139            }
140            _ => Err(parse_error(resp)),
141        }
142    }
143
144    async fn complete_part(
145        &self,
146        upload_id: &str,
147        parts: &[oio::MultipartPart],
148    ) -> Result<Metadata> {
149        let parts = parts
150            .iter()
151            .map(|p| CompleteMultipartUploadRequestPart {
152                part_number: p.part_number,
153                etag: p.etag.clone(),
154            })
155            .collect();
156
157        let mut resp = self
158            .core
159            .cos_complete_multipart_upload(&self.path, upload_id, parts)
160            .await?;
161
162        let mut meta = Self::parse_metadata(resp.headers())?;
163
164        let result: CompleteMultipartUploadResult =
165            quick_xml::de::from_reader(resp.body_mut().reader())
166                .map_err(new_xml_deserialize_error)?;
167        meta.set_etag(&result.etag);
168
169        let status = resp.status();
170
171        match status {
172            StatusCode::OK => Ok(meta),
173            _ => Err(parse_error(resp)),
174        }
175    }
176
177    async fn abort_part(&self, upload_id: &str) -> Result<()> {
178        let resp = self
179            .core
180            .cos_abort_multipart_upload(&self.path, upload_id)
181            .await?;
182        match resp.status() {
183            // cos returns code 204 if abort succeeds.
184            // Reference: https://www.tencentcloud.com/document/product/436/7740
185            StatusCode::NO_CONTENT => Ok(()),
186            _ => Err(parse_error(resp)),
187        }
188    }
189}
190
191impl oio::AppendWrite for CosWriter {
192    async fn offset(&self) -> Result<u64> {
193        let resp = self
194            .core
195            .cos_head_object(&self.path, &OpStat::default())
196            .await?;
197
198        let status = resp.status();
199        match status {
200            StatusCode::OK => {
201                let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
202                    Error::new(
203                        ErrorKind::Unexpected,
204                        "Content-Length not present in returning response",
205                    )
206                })?;
207                Ok(content_length)
208            }
209            StatusCode::NOT_FOUND => Ok(0),
210            _ => Err(parse_error(resp)),
211        }
212    }
213
214    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
215        let mut req = self
216            .core
217            .cos_append_object_request(&self.path, offset, size, &self.op, body)?;
218
219        self.core.sign(&mut req).await?;
220
221        let resp = self.core.send(req).await?;
222
223        let meta = Self::parse_metadata(resp.headers())?;
224
225        let status = resp.status();
226
227        match status {
228            StatusCode::OK => Ok(meta),
229            _ => Err(parse_error(resp)),
230        }
231    }
232}