opendal/services/oss/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use http::{HeaderMap, HeaderValue, StatusCode};
21
22use super::core::*;
23use super::error::parse_error;
24use crate::raw::*;
25use crate::*;
26
27pub type OssWriters = TwoWays<oio::MultipartWriter<OssWriter>, oio::AppendWriter<OssWriter>>;
28
29pub struct OssWriter {
30    core: Arc<OssCore>,
31
32    op: OpWrite,
33    path: String,
34}
35
36impl OssWriter {
37    pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
38        OssWriter {
39            core,
40            path: path.to_string(),
41            op,
42        }
43    }
44
45    fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
46        let mut meta = Metadata::default();
47        if let Some(etag) = parse_etag(headers)? {
48            meta.set_etag(etag);
49        }
50        if let Some(md5) = parse_content_md5(headers)? {
51            meta.set_content_md5(md5);
52        }
53        if let Some(version) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
54            meta.set_version(version);
55        }
56
57        Ok(meta)
58    }
59}
60
61impl oio::MultipartWrite for OssWriter {
62    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
63        let mut req =
64            self.core
65                .oss_put_object_request(&self.path, Some(size), &self.op, body, false)?;
66
67        self.core.sign(&mut req).await?;
68
69        let resp = self.core.send(req).await?;
70
71        let meta = Self::parse_metadata(resp.headers())?;
72        let status = resp.status();
73
74        match status {
75            StatusCode::CREATED | StatusCode::OK => Ok(meta),
76            _ => Err(parse_error(resp)),
77        }
78    }
79
80    async fn initiate_part(&self) -> Result<String> {
81        let resp = self
82            .core
83            .oss_initiate_upload(
84                &self.path,
85                self.op.content_type(),
86                self.op.content_disposition(),
87                self.op.cache_control(),
88                false,
89            )
90            .await?;
91
92        let status = resp.status();
93
94        match status {
95            StatusCode::OK => {
96                let bs = resp.into_body();
97
98                let result: InitiateMultipartUploadResult =
99                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
100                        .map_err(new_xml_deserialize_error)?;
101
102                Ok(result.upload_id)
103            }
104            _ => Err(parse_error(resp)),
105        }
106    }
107
108    async fn write_part(
109        &self,
110        upload_id: &str,
111        part_number: usize,
112        size: u64,
113        body: Buffer,
114    ) -> Result<oio::MultipartPart> {
115        // OSS requires part number must between [1..=10000]
116        let part_number = part_number + 1;
117
118        let resp = self
119            .core
120            .oss_upload_part_request(&self.path, upload_id, part_number, false, size, body)
121            .await?;
122
123        let status = resp.status();
124
125        match status {
126            StatusCode::OK => {
127                let etag = parse_etag(resp.headers())?
128                    .ok_or_else(|| {
129                        Error::new(
130                            ErrorKind::Unexpected,
131                            "ETag not present in returning response",
132                        )
133                    })?
134                    .to_string();
135
136                Ok(oio::MultipartPart {
137                    part_number,
138                    etag,
139                    checksum: None,
140                })
141            }
142            _ => Err(parse_error(resp)),
143        }
144    }
145
146    async fn complete_part(
147        &self,
148        upload_id: &str,
149        parts: &[oio::MultipartPart],
150    ) -> Result<Metadata> {
151        let parts = parts
152            .iter()
153            .map(|p| MultipartUploadPart {
154                part_number: p.part_number,
155                etag: p.etag.clone(),
156            })
157            .collect();
158
159        let resp = self
160            .core
161            .oss_complete_multipart_upload_request(&self.path, upload_id, false, parts)
162            .await?;
163
164        let meta = Self::parse_metadata(resp.headers())?;
165        let status = resp.status();
166
167        match status {
168            StatusCode::OK => Ok(meta),
169            _ => Err(parse_error(resp)),
170        }
171    }
172
173    async fn abort_part(&self, upload_id: &str) -> Result<()> {
174        let resp = self
175            .core
176            .oss_abort_multipart_upload(&self.path, upload_id)
177            .await?;
178        match resp.status() {
179            // OSS returns code 204 if abort succeeds.
180            StatusCode::NO_CONTENT => Ok(()),
181            _ => Err(parse_error(resp)),
182        }
183    }
184}
185
186impl oio::AppendWrite for OssWriter {
187    async fn offset(&self) -> Result<u64> {
188        let resp = self
189            .core
190            .oss_head_object(&self.path, &OpStat::new())
191            .await?;
192
193        let status = resp.status();
194        match status {
195            StatusCode::OK => {
196                let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
197                    Error::new(
198                        ErrorKind::Unexpected,
199                        "Content-Length not present in returning response",
200                    )
201                })?;
202                Ok(content_length)
203            }
204            StatusCode::NOT_FOUND => Ok(0),
205            _ => Err(parse_error(resp)),
206        }
207    }
208
209    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
210        let mut req = self
211            .core
212            .oss_append_object_request(&self.path, offset, size, &self.op, body)?;
213
214        self.core.sign(&mut req).await?;
215
216        let resp = self.core.send(req).await?;
217
218        let meta = Self::parse_metadata(resp.headers())?;
219        let status = resp.status();
220
221        match status {
222            StatusCode::OK => Ok(meta),
223            _ => Err(parse_error(resp)),
224        }
225    }
226}