opendal/services/obs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::{HeaderMap, HeaderValue, StatusCode};
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::MultipartPart;
26use crate::raw::*;
27use crate::*;
28
29pub type ObsWriters = TwoWays<oio::MultipartWriter<ObsWriter>, oio::AppendWriter<ObsWriter>>;
30
31pub struct ObsWriter {
32    core: Arc<ObsCore>,
33
34    op: OpWrite,
35    path: String,
36}
37
38impl ObsWriter {
39    pub fn new(core: Arc<ObsCore>, path: &str, op: OpWrite) -> Self {
40        ObsWriter {
41            core,
42            path: path.to_string(),
43            op,
44        }
45    }
46
47    fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
48        let mut meta = Metadata::default();
49        if let Some(etag) = parse_etag(headers)? {
50            meta.set_etag(etag);
51        }
52        if let Some(md5) = parse_content_md5(headers)? {
53            meta.set_content_md5(md5);
54        }
55        if let Some(version) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? {
56            meta.set_version(version);
57        }
58
59        Ok(meta)
60    }
61}
62
63impl oio::MultipartWrite for ObsWriter {
64    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
65        let mut req = self
66            .core
67            .obs_put_object_request(&self.path, Some(size), &self.op, body)?;
68
69        self.core.sign(&mut req).await?;
70
71        let resp = self.core.send(req).await?;
72
73        let meta = Self::parse_metadata(resp.headers())?;
74
75        let status = resp.status();
76
77        match status {
78            StatusCode::CREATED | StatusCode::OK => Ok(meta),
79            _ => Err(parse_error(resp)),
80        }
81    }
82
83    async fn initiate_part(&self) -> Result<String> {
84        let resp = self
85            .core
86            .obs_initiate_multipart_upload(&self.path, self.op.content_type())
87            .await?;
88
89        let status = resp.status();
90
91        match status {
92            StatusCode::OK => {
93                let bs = resp.into_body();
94
95                let result: InitiateMultipartUploadResult =
96                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
97                        .map_err(new_xml_deserialize_error)?;
98
99                Ok(result.upload_id)
100            }
101            _ => Err(parse_error(resp)),
102        }
103    }
104
105    async fn write_part(
106        &self,
107        upload_id: &str,
108        part_number: usize,
109        size: u64,
110        body: Buffer,
111    ) -> Result<MultipartPart> {
112        // Obs service requires part number must between [1..=10000]
113        let part_number = part_number + 1;
114
115        let resp = self
116            .core
117            .obs_upload_part_request(&self.path, upload_id, part_number, Some(size), body)
118            .await?;
119
120        let status = resp.status();
121
122        match status {
123            StatusCode::OK => {
124                let etag = parse_etag(resp.headers())?
125                    .ok_or_else(|| {
126                        Error::new(
127                            ErrorKind::Unexpected,
128                            "ETag not present in returning response",
129                        )
130                    })?
131                    .to_string();
132
133                Ok(MultipartPart {
134                    part_number,
135                    etag,
136                    checksum: None,
137                })
138            }
139            _ => Err(parse_error(resp)),
140        }
141    }
142
143    async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> {
144        let parts = parts
145            .iter()
146            .map(|p| CompleteMultipartUploadRequestPart {
147                part_number: p.part_number,
148                etag: p.etag.clone(),
149            })
150            .collect();
151
152        let mut resp = self
153            .core
154            .obs_complete_multipart_upload(&self.path, upload_id, parts)
155            .await?;
156
157        let mut meta = Self::parse_metadata(resp.headers())?;
158
159        let result: CompleteMultipartUploadResult =
160            quick_xml::de::from_reader(resp.body_mut().reader())
161                .map_err(new_xml_deserialize_error)?;
162        meta.set_etag(&result.etag);
163
164        let status = resp.status();
165
166        match status {
167            StatusCode::OK => Ok(meta),
168            _ => Err(parse_error(resp)),
169        }
170    }
171
172    async fn abort_part(&self, upload_id: &str) -> Result<()> {
173        let resp = self
174            .core
175            .obs_abort_multipart_upload(&self.path, upload_id)
176            .await?;
177        match resp.status() {
178            // Obs returns code 204 No Content if abort succeeds.
179            // Reference: https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0103.html
180            StatusCode::NO_CONTENT => Ok(()),
181            _ => Err(parse_error(resp)),
182        }
183    }
184}
185
186impl oio::AppendWrite for ObsWriter {
187    async fn offset(&self) -> Result<u64> {
188        let resp = self
189            .core
190            .obs_head_object(&self.path, &OpStat::default())
191            .await?;
192
193        let status = resp.status();
194        match status {
195            StatusCode::OK => {
196                let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
197                    Error::new(
198                        ErrorKind::Unexpected,
199                        "Content-Length not present in returning response",
200                    )
201                })?;
202                Ok(content_length)
203            }
204            StatusCode::NOT_FOUND => Ok(0),
205            _ => Err(parse_error(resp)),
206        }
207    }
208
209    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
210        let mut req = self
211            .core
212            .obs_append_object_request(&self.path, offset, size, &self.op, body)?;
213
214        self.core.sign(&mut req).await?;
215
216        let resp = self.core.send(req).await?;
217
218        let mut meta = Metadata::default();
219        if let Some(md5) = parse_content_md5(resp.headers())? {
220            meta.set_content_md5(md5);
221        }
222        if let Some(version) = parse_header_to_str(resp.headers(), constants::X_OBS_VERSION_ID)? {
223            meta.set_version(version);
224        }
225
226        let status = resp.status();
227
228        match status {
229            StatusCode::OK => Ok(meta),
230            _ => Err(parse_error(resp)),
231        }
232    }
233}