opendal/services/vercel_blob/
core.rs1use bytes::Buf;
19use bytes::Bytes;
20use http::header;
21use http::request;
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use serde::Deserialize;
26use serde::Serialize;
27use serde_json::json;
28use std::fmt::Debug;
29use std::fmt::Formatter;
30use std::sync::Arc;
31
32use self::constants::*;
33use super::error::parse_error;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38 pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
41 pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
45 pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
52 pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
53 pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
54 pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
55}
56
57#[derive(Clone)]
58pub struct VercelBlobCore {
59 pub info: Arc<AccessorInfo>,
60 pub root: String,
62 pub token: String,
64}
65
66impl Debug for VercelBlobCore {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("Backend")
69 .field("root", &self.root)
70 .finish_non_exhaustive()
71 }
72}
73
74impl VercelBlobCore {
75 #[inline]
76 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
77 self.info.http_client().send(req).await
78 }
79
80 pub fn sign(&self, req: request::Builder) -> request::Builder {
81 req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
82 }
83}
84
85impl VercelBlobCore {
86 pub async fn download(
87 &self,
88 path: &str,
89 range: BytesRange,
90 _: &OpRead,
91 ) -> Result<Response<HttpBody>> {
92 let p = build_abs_path(&self.root, path);
93 let resp = self.list(&p, Some(1)).await?;
96
97 let url = resolve_blob(resp.blobs, p);
99
100 if url.is_empty() {
101 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
102 }
103
104 let mut req = Request::get(url);
105
106 if !range.is_full() {
107 req = req.header(header::RANGE, range.to_header());
108 }
109
110 let req = req
112 .extension(Operation::Read)
113 .body(Buffer::new())
114 .map_err(new_request_build_error)?;
115
116 self.info.http_client().fetch(req).await
117 }
118
119 pub async fn upload(
120 &self,
121 path: &str,
122 size: u64,
123 args: &OpWrite,
124 body: Buffer,
125 ) -> Result<Response<Buffer>> {
126 let p = build_abs_path(&self.root, path);
127
128 let url = format!(
129 "https://blob.vercel-storage.com/{}",
130 percent_encode_path(&p)
131 );
132
133 let mut req = Request::put(&url);
134
135 req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
136
137 req = req.header(header::CONTENT_LENGTH, size.to_string());
138
139 if let Some(mime) = args.content_type() {
140 req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
141 }
142
143 let req = self.sign(req);
144
145 let req = req
147 .extension(Operation::Write)
148 .body(body)
149 .map_err(new_request_build_error)?;
150
151 self.send(req).await
152 }
153
154 pub async fn head(&self, path: &str) -> Result<Response<Buffer>> {
155 let p = build_abs_path(&self.root, path);
156
157 let resp = self.list(&p, Some(1)).await?;
158
159 let url = resolve_blob(resp.blobs, p);
160
161 if url.is_empty() {
162 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
163 }
164
165 let req = Request::get(format!(
166 "https://blob.vercel-storage.com?url={}",
167 percent_encode_path(&url)
168 ));
169
170 let req = self.sign(req);
171
172 let req = req
174 .extension(Operation::Stat)
175 .body(Buffer::new())
176 .map_err(new_request_build_error)?;
177
178 self.send(req).await
179 }
180
181 pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
182 let from = build_abs_path(&self.root, from);
183
184 let resp = self.list(&from, Some(1)).await?;
185
186 let from_url = resolve_blob(resp.blobs, from);
187
188 if from_url.is_empty() {
189 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
190 }
191
192 let to = build_abs_path(&self.root, to);
193
194 let to_url = format!(
195 "https://blob.vercel-storage.com/{}?fromUrl={}",
196 percent_encode_path(&to),
197 percent_encode_path(&from_url),
198 );
199
200 let req = Request::put(&to_url);
201
202 let req = self.sign(req);
203
204 let req = req
206 .extension(Operation::Copy)
207 .body(Buffer::new())
208 .map_err(new_request_build_error)?;
209
210 self.send(req).await
211 }
212
213 pub async fn list(&self, prefix: &str, limit: Option<usize>) -> Result<ListResponse> {
214 let prefix = if prefix == "/" { "" } else { prefix };
215
216 let mut url = format!(
217 "https://blob.vercel-storage.com?prefix={}",
218 percent_encode_path(prefix)
219 );
220
221 if let Some(limit) = limit {
222 url.push_str(&format!("&limit={}", limit))
223 }
224
225 let req = Request::get(&url);
226
227 let req = self.sign(req);
228
229 let req = req
231 .extension(Operation::List)
232 .body(Buffer::new())
233 .map_err(new_request_build_error)?;
234
235 let resp = self.send(req).await?;
236
237 let status = resp.status();
238
239 if status != StatusCode::OK {
240 return Err(parse_error(resp));
241 }
242
243 let body = resp.into_body();
244
245 let resp: ListResponse =
246 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
247
248 Ok(resp)
249 }
250
251 pub async fn initiate_multipart_upload(
252 &self,
253 path: &str,
254 args: &OpWrite,
255 ) -> Result<Response<Buffer>> {
256 let p = build_abs_path(&self.root, path);
257
258 let url = format!(
259 "https://blob.vercel-storage.com/mpu/{}",
260 percent_encode_path(&p)
261 );
262
263 let req = Request::post(&url);
264
265 let mut req = self.sign(req);
266
267 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
268 req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
269
270 if let Some(mime) = args.content_type() {
271 req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
272 };
273
274 let req = req
276 .extension(Operation::Write)
277 .body(Buffer::new())
278 .map_err(new_request_build_error)?;
279
280 self.send(req).await
281 }
282
283 pub async fn upload_part(
284 &self,
285 path: &str,
286 upload_id: &str,
287 part_number: usize,
288 size: u64,
289 body: Buffer,
290 ) -> Result<Response<Buffer>> {
291 let p = build_abs_path(&self.root, path);
292
293 let url = format!(
294 "https://blob.vercel-storage.com/mpu/{}",
295 percent_encode_path(&p)
296 );
297
298 let mut req = Request::post(&url);
299
300 req = req.header(header::CONTENT_LENGTH, size);
301 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
302 req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
303 req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
304 req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
305
306 let req = self.sign(req);
307
308 let req = req
310 .extension(Operation::Write)
311 .body(body)
312 .map_err(new_request_build_error)?;
313
314 self.send(req).await
315 }
316
317 pub async fn complete_multipart_upload(
318 &self,
319 path: &str,
320 upload_id: &str,
321 parts: Vec<Part>,
322 ) -> Result<Response<Buffer>> {
323 let p = build_abs_path(&self.root, path);
324
325 let url = format!(
326 "https://blob.vercel-storage.com/mpu/{}",
327 percent_encode_path(&p)
328 );
329
330 let mut req = Request::post(&url);
331
332 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
333 req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
334 req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
335
336 let req = self.sign(req);
337
338 let parts_json = json!(parts);
339
340 let req = req
341 .header(header::CONTENT_TYPE, "application/json")
342 .extension(Operation::Write)
343 .body(Buffer::from(Bytes::from(parts_json.to_string())))
344 .map_err(new_request_build_error)?;
345
346 self.send(req).await
347 }
348}
349
350pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
351 let mode = if blob.pathname.ends_with('/') {
352 EntryMode::DIR
353 } else {
354 EntryMode::FILE
355 };
356 let mut md = Metadata::new(mode);
357 if let Some(content_type) = blob.content_type.clone() {
358 md.set_content_type(&content_type);
359 }
360 md.set_content_length(blob.size);
361 md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?);
362 md.set_content_disposition(&blob.content_disposition);
363 Ok(md)
364}
365
366fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
367 for blob in blobs {
368 if blob.pathname == path {
369 return blob.url;
370 }
371 }
372 "".to_string()
373}
374
375#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
376#[serde(rename_all = "camelCase")]
377pub struct ListResponse {
378 pub cursor: Option<String>,
379 pub has_more: bool,
380 pub blobs: Vec<Blob>,
381}
382
383#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
384#[serde(rename_all = "camelCase")]
385pub struct Blob {
386 pub url: String,
387 pub pathname: String,
388 pub size: u64,
389 pub uploaded_at: String,
390 pub content_disposition: String,
391 pub content_type: Option<String>,
392}
393
394#[derive(Default, Debug, Clone, PartialEq, Serialize)]
395#[serde(rename_all = "camelCase")]
396pub struct Part {
397 pub part_number: usize,
398 pub etag: String,
399}
400
401#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
402#[serde(rename_all = "camelCase")]
403pub struct InitiateMultipartUploadResponse {
404 pub upload_id: String,
405 pub key: String,
406}
407
408#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
409#[serde(rename_all = "camelCase")]
410pub struct UploadPartResponse {
411 pub etag: String,
412}