opendal/services/upyun/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use http::StatusCode;
21
22use super::core::constants::X_UPYUN_MULTI_UUID;
23use super::core::UpyunCore;
24use super::error::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub type UpyunWriters = oio::MultipartWriter<UpyunWriter>;
29
30pub struct UpyunWriter {
31    core: Arc<UpyunCore>,
32    op: OpWrite,
33    path: String,
34}
35
36impl UpyunWriter {
37    pub fn new(core: Arc<UpyunCore>, op: OpWrite, path: String) -> Self {
38        UpyunWriter { core, op, path }
39    }
40}
41
42impl oio::MultipartWrite for UpyunWriter {
43    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
44        let req = self.core.upload(&self.path, Some(size), &self.op, body)?;
45
46        let resp = self.core.send(req).await?;
47
48        let status = resp.status();
49
50        match status {
51            StatusCode::OK => Ok(Metadata::default()),
52            _ => Err(parse_error(resp)),
53        }
54    }
55
56    async fn initiate_part(&self) -> Result<String> {
57        let resp = self
58            .core
59            .initiate_multipart_upload(&self.path, &self.op)
60            .await?;
61
62        let status = resp.status();
63
64        match status {
65            StatusCode::NO_CONTENT => {
66                let id =
67                    parse_header_to_str(resp.headers(), X_UPYUN_MULTI_UUID)?.ok_or(Error::new(
68                        ErrorKind::Unexpected,
69                        format!("{} header is missing", X_UPYUN_MULTI_UUID),
70                    ))?;
71
72                Ok(id.to_string())
73            }
74            _ => Err(parse_error(resp)),
75        }
76    }
77
78    async fn write_part(
79        &self,
80        upload_id: &str,
81        part_number: usize,
82        size: u64,
83        body: Buffer,
84    ) -> Result<oio::MultipartPart> {
85        let req = self
86            .core
87            .upload_part(&self.path, upload_id, part_number, size, body)?;
88
89        let resp = self.core.send(req).await?;
90
91        let status = resp.status();
92
93        match status {
94            StatusCode::NO_CONTENT | StatusCode::CREATED => Ok(oio::MultipartPart {
95                part_number,
96                etag: "".to_string(),
97                checksum: None,
98            }),
99            _ => Err(parse_error(resp)),
100        }
101    }
102
103    async fn complete_part(
104        &self,
105        upload_id: &str,
106        _parts: &[oio::MultipartPart],
107    ) -> Result<Metadata> {
108        let resp = self
109            .core
110            .complete_multipart_upload(&self.path, upload_id)
111            .await?;
112
113        let status = resp.status();
114
115        match status {
116            StatusCode::NO_CONTENT => Ok(Metadata::default()),
117            _ => Err(parse_error(resp)),
118        }
119    }
120
121    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
122        Err(Error::new(
123            ErrorKind::Unsupported,
124            "Upyun does not support abort multipart upload",
125        ))
126    }
127}