opendal/services/oss/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use http::HeaderMap;
21use http::HeaderValue;
22use http::StatusCode;
23
24use super::core::*;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub type OssWriters = TwoWays<oio::MultipartWriter<OssWriter>, oio::AppendWriter<OssWriter>>;
30
31pub struct OssWriter {
32    core: Arc<OssCore>,
33
34    op: OpWrite,
35    path: String,
36}
37
38impl OssWriter {
39    pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
40        OssWriter {
41            core,
42            path: path.to_string(),
43            op,
44        }
45    }
46
47    fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
48        let mut meta = Metadata::default();
49        if let Some(etag) = parse_etag(headers)? {
50            meta.set_etag(etag);
51        }
52        if let Some(md5) = parse_content_md5(headers)? {
53            meta.set_content_md5(md5);
54        }
55        if let Some(version) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
56            meta.set_version(version);
57        }
58
59        Ok(meta)
60    }
61}
62
63impl oio::MultipartWrite for OssWriter {
64    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
65        let mut req =
66            self.core
67                .oss_put_object_request(&self.path, Some(size), &self.op, body, false)?;
68
69        self.core.sign(&mut req).await?;
70
71        let resp = self.core.send(req).await?;
72
73        let meta = Self::parse_metadata(resp.headers())?;
74        let status = resp.status();
75
76        match status {
77            StatusCode::CREATED | StatusCode::OK => Ok(meta),
78            _ => Err(parse_error(resp)),
79        }
80    }
81
82    async fn initiate_part(&self) -> Result<String> {
83        let resp = self
84            .core
85            .oss_initiate_upload(
86                &self.path,
87                self.op.content_type(),
88                self.op.content_disposition(),
89                self.op.cache_control(),
90                false,
91            )
92            .await?;
93
94        let status = resp.status();
95
96        match status {
97            StatusCode::OK => {
98                let bs = resp.into_body();
99
100                let result: InitiateMultipartUploadResult =
101                    quick_xml::de::from_reader(bytes::Buf::reader(bs))
102                        .map_err(new_xml_deserialize_error)?;
103
104                Ok(result.upload_id)
105            }
106            _ => Err(parse_error(resp)),
107        }
108    }
109
110    async fn write_part(
111        &self,
112        upload_id: &str,
113        part_number: usize,
114        size: u64,
115        body: Buffer,
116    ) -> Result<oio::MultipartPart> {
117        // OSS requires part number must between [1..=10000]
118        let part_number = part_number + 1;
119
120        let resp = self
121            .core
122            .oss_upload_part_request(&self.path, upload_id, part_number, false, size, body)
123            .await?;
124
125        let status = resp.status();
126
127        match status {
128            StatusCode::OK => {
129                let etag = parse_etag(resp.headers())?
130                    .ok_or_else(|| {
131                        Error::new(
132                            ErrorKind::Unexpected,
133                            "ETag not present in returning response",
134                        )
135                    })?
136                    .to_string();
137
138                Ok(oio::MultipartPart {
139                    part_number,
140                    etag,
141                    checksum: None,
142                })
143            }
144            _ => Err(parse_error(resp)),
145        }
146    }
147
148    async fn complete_part(
149        &self,
150        upload_id: &str,
151        parts: &[oio::MultipartPart],
152    ) -> Result<Metadata> {
153        let parts = parts
154            .iter()
155            .map(|p| MultipartUploadPart {
156                part_number: p.part_number,
157                etag: p.etag.clone(),
158            })
159            .collect();
160
161        let resp = self
162            .core
163            .oss_complete_multipart_upload_request(&self.path, upload_id, false, parts)
164            .await?;
165
166        let meta = Self::parse_metadata(resp.headers())?;
167        let status = resp.status();
168
169        match status {
170            StatusCode::OK => Ok(meta),
171            _ => Err(parse_error(resp)),
172        }
173    }
174
175    async fn abort_part(&self, upload_id: &str) -> Result<()> {
176        let resp = self
177            .core
178            .oss_abort_multipart_upload(&self.path, upload_id)
179            .await?;
180        match resp.status() {
181            // OSS returns code 204 if abort succeeds.
182            StatusCode::NO_CONTENT => Ok(()),
183            _ => Err(parse_error(resp)),
184        }
185    }
186}
187
188impl oio::AppendWrite for OssWriter {
189    async fn offset(&self) -> Result<u64> {
190        let resp = self
191            .core
192            .oss_head_object(&self.path, &OpStat::new())
193            .await?;
194
195        let status = resp.status();
196        match status {
197            StatusCode::OK => {
198                let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
199                    Error::new(
200                        ErrorKind::Unexpected,
201                        "Content-Length not present in returning response",
202                    )
203                })?;
204                Ok(content_length)
205            }
206            StatusCode::NOT_FOUND => Ok(0),
207            _ => Err(parse_error(resp)),
208        }
209    }
210
211    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
212        let mut req = self
213            .core
214            .oss_append_object_request(&self.path, offset, size, &self.op, body)?;
215
216        self.core.sign(&mut req).await?;
217
218        let resp = self.core.send(req).await?;
219
220        let meta = Self::parse_metadata(resp.headers())?;
221        let status = resp.status();
222
223        match status {
224            StatusCode::OK => Ok(meta),
225            _ => Err(parse_error(resp)),
226        }
227    }
228}