opendal/services/vercel_blob/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::InitiateMultipartUploadResponse;
24use super::core::Part;
25use super::core::UploadPartResponse;
26use super::core::VercelBlobCore;
27use super::error::parse_error;
28use crate::raw::*;
29use crate::*;
30
31pub type VercelBlobWriters = oio::MultipartWriter<VercelBlobWriter>;
32
33pub struct VercelBlobWriter {
34    core: Arc<VercelBlobCore>,
35    op: OpWrite,
36    path: String,
37}
38
39impl VercelBlobWriter {
40    pub fn new(core: Arc<VercelBlobCore>, op: OpWrite, path: String) -> Self {
41        VercelBlobWriter { core, op, path }
42    }
43}
44
45impl oio::MultipartWrite for VercelBlobWriter {
46    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
47        let req = self
48            .core
49            .get_put_request(&self.path, Some(size), &self.op, body)?;
50
51        let resp = self.core.send(req).await?;
52
53        let status = resp.status();
54
55        match status {
56            StatusCode::OK => Ok(Metadata::default()),
57            _ => Err(parse_error(resp)),
58        }
59    }
60
61    async fn initiate_part(&self) -> Result<String> {
62        let resp = self
63            .core
64            .initiate_multipart_upload(&self.path, &self.op)
65            .await?;
66
67        let status = resp.status();
68
69        match status {
70            StatusCode::OK => {
71                let bs = resp.into_body();
72
73                let resp: InitiateMultipartUploadResponse =
74                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
75
76                Ok(resp.upload_id)
77            }
78            _ => Err(parse_error(resp)),
79        }
80    }
81
82    async fn write_part(
83        &self,
84        upload_id: &str,
85        part_number: usize,
86        size: u64,
87        body: Buffer,
88    ) -> Result<oio::MultipartPart> {
89        let part_number = part_number + 1;
90
91        let resp = self
92            .core
93            .upload_part(&self.path, upload_id, part_number, size, body)
94            .await?;
95
96        let status = resp.status();
97
98        match status {
99            StatusCode::OK => {
100                let bs = resp.into_body();
101
102                let resp: UploadPartResponse =
103                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
104
105                Ok(oio::MultipartPart {
106                    part_number,
107                    etag: resp.etag,
108                    checksum: None,
109                })
110            }
111            _ => Err(parse_error(resp)),
112        }
113    }
114
115    async fn complete_part(
116        &self,
117        upload_id: &str,
118        parts: &[oio::MultipartPart],
119    ) -> Result<Metadata> {
120        let parts = parts
121            .iter()
122            .map(|p| Part {
123                part_number: p.part_number,
124                etag: p.etag.clone(),
125            })
126            .collect::<Vec<Part>>();
127
128        let resp = self
129            .core
130            .complete_multipart_upload(&self.path, upload_id, parts)
131            .await?;
132
133        let status = resp.status();
134
135        match status {
136            StatusCode::OK => Ok(Metadata::default()),
137            _ => Err(parse_error(resp)),
138        }
139    }
140
141    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
142        Err(Error::new(
143            ErrorKind::Unsupported,
144            "VercelBlob does not support abort multipart upload",
145        ))
146    }
147}