opendal/services/vercel_blob/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::InitiateMultipartUploadResponse;
24use super::core::Part;
25use super::core::UploadPartResponse;
26use super::core::VercelBlobCore;
27use super::error::parse_error;
28use crate::raw::*;
29use crate::*;
30
31pub type VercelBlobWriters = oio::MultipartWriter<VercelBlobWriter>;
32
33pub struct VercelBlobWriter {
34 core: Arc<VercelBlobCore>,
35 op: OpWrite,
36 path: String,
37}
38
39impl VercelBlobWriter {
40 pub fn new(core: Arc<VercelBlobCore>, op: OpWrite, path: String) -> Self {
41 VercelBlobWriter { core, op, path }
42 }
43}
44
45impl oio::MultipartWrite for VercelBlobWriter {
46 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
47 let resp = self.core.upload(&self.path, size, &self.op, body).await?;
48
49 let status = resp.status();
50
51 match status {
52 StatusCode::OK => Ok(Metadata::default()),
53 _ => Err(parse_error(resp)),
54 }
55 }
56
57 async fn initiate_part(&self) -> Result<String> {
58 let resp = self
59 .core
60 .initiate_multipart_upload(&self.path, &self.op)
61 .await?;
62
63 let status = resp.status();
64
65 match status {
66 StatusCode::OK => {
67 let bs = resp.into_body();
68
69 let resp: InitiateMultipartUploadResponse =
70 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
71
72 Ok(resp.upload_id)
73 }
74 _ => Err(parse_error(resp)),
75 }
76 }
77
78 async fn write_part(
79 &self,
80 upload_id: &str,
81 part_number: usize,
82 size: u64,
83 body: Buffer,
84 ) -> Result<oio::MultipartPart> {
85 let part_number = part_number + 1;
86
87 let resp = self
88 .core
89 .upload_part(&self.path, upload_id, part_number, size, body)
90 .await?;
91
92 let status = resp.status();
93
94 match status {
95 StatusCode::OK => {
96 let bs = resp.into_body();
97
98 let resp: UploadPartResponse =
99 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
100
101 Ok(oio::MultipartPart {
102 part_number,
103 etag: resp.etag,
104 checksum: None,
105 })
106 }
107 _ => Err(parse_error(resp)),
108 }
109 }
110
111 async fn complete_part(
112 &self,
113 upload_id: &str,
114 parts: &[oio::MultipartPart],
115 ) -> Result<Metadata> {
116 let parts = parts
117 .iter()
118 .map(|p| Part {
119 part_number: p.part_number,
120 etag: p.etag.clone(),
121 })
122 .collect::<Vec<Part>>();
123
124 let resp = self
125 .core
126 .complete_multipart_upload(&self.path, upload_id, parts)
127 .await?;
128
129 let status = resp.status();
130
131 match status {
132 StatusCode::OK => Ok(Metadata::default()),
133 _ => Err(parse_error(resp)),
134 }
135 }
136
137 async fn abort_part(&self, _upload_id: &str) -> Result<()> {
138 Err(Error::new(
139 ErrorKind::Unsupported,
140 "VercelBlob does not support abort multipart upload",
141 ))
142 }
143}