opendal/services/obs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::HeaderMap;
22use http::HeaderValue;
23use http::StatusCode;
24
25use super::core::*;
26use super::error::parse_error;
27use crate::raw::oio::MultipartPart;
28use crate::raw::*;
29use crate::*;
30
31pub type ObsWriters = TwoWays<oio::MultipartWriter<ObsWriter>, oio::AppendWriter<ObsWriter>>;
32
33pub struct ObsWriter {
34    core: Arc<ObsCore>,
35
36    op: OpWrite,
37    path: String,
38}
39
40impl ObsWriter {
41    pub fn new(core: Arc<ObsCore>, path: &str, op: OpWrite) -> Self {
42        ObsWriter {
43            core,
44            path: path.to_string(),
45            op,
46        }
47    }
48
49    fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
50        let mut meta = Metadata::default();
51        if let Some(etag) = parse_etag(headers)? {
52            meta.set_etag(etag);
53        }
54        if let Some(md5) = parse_content_md5(headers)? {
55            meta.set_content_md5(md5);
56        }
57        if let Some(version) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? {
58            meta.set_version(version);
59        }
60
61        Ok(meta)
62    }
63}
64
65impl oio::MultipartWrite for ObsWriter {
66    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
67        let mut req = self
68            .core
69            .obs_put_object_request(&self.path, Some(size), &self.op, body)?;
70
71        self.core.sign(&mut req).await?;
72
73        let resp = self.core.send(req).await?;
74
75        let meta = Self::parse_metadata(resp.headers())?;
76
77        let status = resp.status();
78
79        match status {
80            StatusCode::CREATED | StatusCode::OK => Ok(meta),
81            _ => Err(parse_error(resp)),
82        }
83    }
84
85    async fn initiate_part(&self) -> Result<String> {
86        let resp = self
87            .core
88            .obs_initiate_multipart_upload(&self.path, self.op.content_type())
89            .await?;
90
91        let status = resp.status();
92
93        match status {
94            StatusCode::OK => {
95                let bs = resp.into_body();
96
97                let result: InitiateMultipartUploadResult =
98                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
99                        .map_err(new_xml_deserialize_error)?;
100
101                Ok(result.upload_id)
102            }
103            _ => Err(parse_error(resp)),
104        }
105    }
106
107    async fn write_part(
108        &self,
109        upload_id: &str,
110        part_number: usize,
111        size: u64,
112        body: Buffer,
113    ) -> Result<MultipartPart> {
114        // Obs service requires part number must between [1..=10000]
115        let part_number = part_number + 1;
116
117        let resp = self
118            .core
119            .obs_upload_part_request(&self.path, upload_id, part_number, Some(size), body)
120            .await?;
121
122        let status = resp.status();
123
124        match status {
125            StatusCode::OK => {
126                let etag = parse_etag(resp.headers())?
127                    .ok_or_else(|| {
128                        Error::new(
129                            ErrorKind::Unexpected,
130                            "ETag not present in returning response",
131                        )
132                    })?
133                    .to_string();
134
135                Ok(MultipartPart {
136                    part_number,
137                    etag,
138                    checksum: None,
139                })
140            }
141            _ => Err(parse_error(resp)),
142        }
143    }
144
145    async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> {
146        let parts = parts
147            .iter()
148            .map(|p| CompleteMultipartUploadRequestPart {
149                part_number: p.part_number,
150                etag: p.etag.clone(),
151            })
152            .collect();
153
154        let mut resp = self
155            .core
156            .obs_complete_multipart_upload(&self.path, upload_id, parts)
157            .await?;
158
159        let mut meta = Self::parse_metadata(resp.headers())?;
160
161        let result: CompleteMultipartUploadResult =
162            quick_xml::de::from_reader(resp.body_mut().reader())
163                .map_err(new_xml_deserialize_error)?;
164        meta.set_etag(&result.etag);
165
166        let status = resp.status();
167
168        match status {
169            StatusCode::OK => Ok(meta),
170            _ => Err(parse_error(resp)),
171        }
172    }
173
174    async fn abort_part(&self, upload_id: &str) -> Result<()> {
175        let resp = self
176            .core
177            .obs_abort_multipart_upload(&self.path, upload_id)
178            .await?;
179        match resp.status() {
180            // Obs returns code 204 No Content if abort succeeds.
181            // Reference: https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0103.html
182            StatusCode::NO_CONTENT => Ok(()),
183            _ => Err(parse_error(resp)),
184        }
185    }
186}
187
188impl oio::AppendWrite for ObsWriter {
189    async fn offset(&self) -> Result<u64> {
190        let resp = self
191            .core
192            .obs_head_object(&self.path, &OpStat::default())
193            .await?;
194
195        let status = resp.status();
196        match status {
197            StatusCode::OK => {
198                let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
199                    Error::new(
200                        ErrorKind::Unexpected,
201                        "Content-Length not present in returning response",
202                    )
203                })?;
204                Ok(content_length)
205            }
206            StatusCode::NOT_FOUND => Ok(0),
207            _ => Err(parse_error(resp)),
208        }
209    }
210
211    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
212        let mut req = self
213            .core
214            .obs_append_object_request(&self.path, offset, size, &self.op, body)?;
215
216        self.core.sign(&mut req).await?;
217
218        let resp = self.core.send(req).await?;
219
220        let mut meta = Metadata::default();
221        if let Some(md5) = parse_content_md5(resp.headers())? {
222            meta.set_content_md5(md5);
223        }
224        if let Some(version) = parse_header_to_str(resp.headers(), constants::X_OBS_VERSION_ID)? {
225            meta.set_version(version);
226        }
227
228        let status = resp.status();
229
230        match status {
231            StatusCode::OK => Ok(meta),
232            _ => Err(parse_error(resp)),
233        }
234    }
235}