opendal/services/vercel_blob/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::InitiateMultipartUploadResponse;
24use super::core::Part;
25use super::core::UploadPartResponse;
26use super::core::VercelBlobCore;
27use super::error::parse_error;
28use crate::raw::*;
29use crate::*;
30
31pub type VercelBlobWriters = oio::MultipartWriter<VercelBlobWriter>;
32
33pub struct VercelBlobWriter {
34    core: Arc<VercelBlobCore>,
35    op: OpWrite,
36    path: String,
37}
38
39impl VercelBlobWriter {
40    pub fn new(core: Arc<VercelBlobCore>, op: OpWrite, path: String) -> Self {
41        VercelBlobWriter { core, op, path }
42    }
43}
44
45impl oio::MultipartWrite for VercelBlobWriter {
46    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
47        let resp = self.core.upload(&self.path, size, &self.op, body).await?;
48
49        let status = resp.status();
50
51        match status {
52            StatusCode::OK => Ok(Metadata::default()),
53            _ => Err(parse_error(resp)),
54        }
55    }
56
57    async fn initiate_part(&self) -> Result<String> {
58        let resp = self
59            .core
60            .initiate_multipart_upload(&self.path, &self.op)
61            .await?;
62
63        let status = resp.status();
64
65        match status {
66            StatusCode::OK => {
67                let bs = resp.into_body();
68
69                let resp: InitiateMultipartUploadResponse =
70                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
71
72                Ok(resp.upload_id)
73            }
74            _ => Err(parse_error(resp)),
75        }
76    }
77
78    async fn write_part(
79        &self,
80        upload_id: &str,
81        part_number: usize,
82        size: u64,
83        body: Buffer,
84    ) -> Result<oio::MultipartPart> {
85        let part_number = part_number + 1;
86
87        let resp = self
88            .core
89            .upload_part(&self.path, upload_id, part_number, size, body)
90            .await?;
91
92        let status = resp.status();
93
94        match status {
95            StatusCode::OK => {
96                let bs = resp.into_body();
97
98                let resp: UploadPartResponse =
99                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
100
101                Ok(oio::MultipartPart {
102                    part_number,
103                    etag: resp.etag,
104                    checksum: None,
105                })
106            }
107            _ => Err(parse_error(resp)),
108        }
109    }
110
111    async fn complete_part(
112        &self,
113        upload_id: &str,
114        parts: &[oio::MultipartPart],
115    ) -> Result<Metadata> {
116        let parts = parts
117            .iter()
118            .map(|p| Part {
119                part_number: p.part_number,
120                etag: p.etag.clone(),
121            })
122            .collect::<Vec<Part>>();
123
124        let resp = self
125            .core
126            .complete_multipart_upload(&self.path, upload_id, parts)
127            .await?;
128
129        let status = resp.status();
130
131        match status {
132            StatusCode::OK => Ok(Metadata::default()),
133            _ => Err(parse_error(resp)),
134        }
135    }
136
137    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
138        Err(Error::new(
139            ErrorKind::Unsupported,
140            "VercelBlob does not support abort multipart upload",
141        ))
142    }
143}