opendal/services/vercel_blob/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::InitiateMultipartUploadResponse;
24use super::core::Part;
25use super::core::UploadPartResponse;
26use super::core::VercelBlobCore;
27use super::error::parse_error;
28use crate::raw::*;
29use crate::*;
30
31pub type VercelBlobWriters = oio::MultipartWriter<VercelBlobWriter>;
32
33pub struct VercelBlobWriter {
34 core: Arc<VercelBlobCore>,
35 op: OpWrite,
36 path: String,
37}
38
39impl VercelBlobWriter {
40 pub fn new(core: Arc<VercelBlobCore>, op: OpWrite, path: String) -> Self {
41 VercelBlobWriter { core, op, path }
42 }
43}
44
45impl oio::MultipartWrite for VercelBlobWriter {
46 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
47 let req = self
48 .core
49 .get_put_request(&self.path, Some(size), &self.op, body)?;
50
51 let resp = self.core.send(req).await?;
52
53 let status = resp.status();
54
55 match status {
56 StatusCode::OK => Ok(Metadata::default()),
57 _ => Err(parse_error(resp)),
58 }
59 }
60
61 async fn initiate_part(&self) -> Result<String> {
62 let resp = self
63 .core
64 .initiate_multipart_upload(&self.path, &self.op)
65 .await?;
66
67 let status = resp.status();
68
69 match status {
70 StatusCode::OK => {
71 let bs = resp.into_body();
72
73 let resp: InitiateMultipartUploadResponse =
74 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
75
76 Ok(resp.upload_id)
77 }
78 _ => Err(parse_error(resp)),
79 }
80 }
81
82 async fn write_part(
83 &self,
84 upload_id: &str,
85 part_number: usize,
86 size: u64,
87 body: Buffer,
88 ) -> Result<oio::MultipartPart> {
89 let part_number = part_number + 1;
90
91 let resp = self
92 .core
93 .upload_part(&self.path, upload_id, part_number, size, body)
94 .await?;
95
96 let status = resp.status();
97
98 match status {
99 StatusCode::OK => {
100 let bs = resp.into_body();
101
102 let resp: UploadPartResponse =
103 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
104
105 Ok(oio::MultipartPart {
106 part_number,
107 etag: resp.etag,
108 checksum: None,
109 })
110 }
111 _ => Err(parse_error(resp)),
112 }
113 }
114
115 async fn complete_part(
116 &self,
117 upload_id: &str,
118 parts: &[oio::MultipartPart],
119 ) -> Result<Metadata> {
120 let parts = parts
121 .iter()
122 .map(|p| Part {
123 part_number: p.part_number,
124 etag: p.etag.clone(),
125 })
126 .collect::<Vec<Part>>();
127
128 let resp = self
129 .core
130 .complete_multipart_upload(&self.path, upload_id, parts)
131 .await?;
132
133 let status = resp.status();
134
135 match status {
136 StatusCode::OK => Ok(Metadata::default()),
137 _ => Err(parse_error(resp)),
138 }
139 }
140
141 async fn abort_part(&self, _upload_id: &str) -> Result<()> {
142 Err(Error::new(
143 ErrorKind::Unsupported,
144 "VercelBlob does not support abort multipart upload",
145 ))
146 }
147}