opendal/services/vercel_blob/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::Bytes;
24use http::header;
25use http::request;
26use http::Request;
27use http::Response;
28use http::StatusCode;
29use serde::Deserialize;
30use serde::Serialize;
31use serde_json::json;
32
33use self::constants::*;
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38pub(super) mod constants {
39 pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
42 pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
46 pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
53 pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
54 pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
55 pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
56}
57
58#[derive(Clone)]
59pub struct VercelBlobCore {
60 pub info: Arc<AccessorInfo>,
61 pub root: String,
63 pub token: String,
65}
66
67impl Debug for VercelBlobCore {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("Backend")
70 .field("root", &self.root)
71 .finish_non_exhaustive()
72 }
73}
74
75impl VercelBlobCore {
76 #[inline]
77 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
78 self.info.http_client().send(req).await
79 }
80
81 pub fn sign(&self, req: request::Builder) -> request::Builder {
82 req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
83 }
84}
85
86impl VercelBlobCore {
87 pub async fn download(
88 &self,
89 path: &str,
90 range: BytesRange,
91 _: &OpRead,
92 ) -> Result<Response<HttpBody>> {
93 let p = build_abs_path(&self.root, path);
94 let resp = self.list(&p, Some(1)).await?;
97
98 let url = resolve_blob(resp.blobs, p);
100
101 if url.is_empty() {
102 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
103 }
104
105 let mut req = Request::get(url);
106
107 if !range.is_full() {
108 req = req.header(header::RANGE, range.to_header());
109 }
110
111 let req = req
113 .extension(Operation::Read)
114 .body(Buffer::new())
115 .map_err(new_request_build_error)?;
116
117 self.info.http_client().fetch(req).await
118 }
119
120 pub async fn upload(
121 &self,
122 path: &str,
123 size: u64,
124 args: &OpWrite,
125 body: Buffer,
126 ) -> Result<Response<Buffer>> {
127 let p = build_abs_path(&self.root, path);
128
129 let url = format!(
130 "https://blob.vercel-storage.com/{}",
131 percent_encode_path(&p)
132 );
133
134 let mut req = Request::put(&url);
135
136 req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
137
138 req = req.header(header::CONTENT_LENGTH, size.to_string());
139
140 if let Some(mime) = args.content_type() {
141 req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
142 }
143
144 let req = self.sign(req);
145
146 let req = req
148 .extension(Operation::Write)
149 .body(body)
150 .map_err(new_request_build_error)?;
151
152 self.send(req).await
153 }
154
155 pub async fn head(&self, path: &str) -> Result<Response<Buffer>> {
156 let p = build_abs_path(&self.root, path);
157
158 let resp = self.list(&p, Some(1)).await?;
159
160 let url = resolve_blob(resp.blobs, p);
161
162 if url.is_empty() {
163 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
164 }
165
166 let req = Request::get(format!(
167 "https://blob.vercel-storage.com?url={}",
168 percent_encode_path(&url)
169 ));
170
171 let req = self.sign(req);
172
173 let req = req
175 .extension(Operation::Stat)
176 .body(Buffer::new())
177 .map_err(new_request_build_error)?;
178
179 self.send(req).await
180 }
181
182 pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
183 let from = build_abs_path(&self.root, from);
184
185 let resp = self.list(&from, Some(1)).await?;
186
187 let from_url = resolve_blob(resp.blobs, from);
188
189 if from_url.is_empty() {
190 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
191 }
192
193 let to = build_abs_path(&self.root, to);
194
195 let to_url = format!(
196 "https://blob.vercel-storage.com/{}?fromUrl={}",
197 percent_encode_path(&to),
198 percent_encode_path(&from_url),
199 );
200
201 let req = Request::put(&to_url);
202
203 let req = self.sign(req);
204
205 let req = req
207 .extension(Operation::Copy)
208 .body(Buffer::new())
209 .map_err(new_request_build_error)?;
210
211 self.send(req).await
212 }
213
214 pub async fn list(&self, prefix: &str, limit: Option<usize>) -> Result<ListResponse> {
215 let prefix = if prefix == "/" { "" } else { prefix };
216
217 let mut url = format!(
218 "https://blob.vercel-storage.com?prefix={}",
219 percent_encode_path(prefix)
220 );
221
222 if let Some(limit) = limit {
223 url.push_str(&format!("&limit={limit}"))
224 }
225
226 let req = Request::get(&url);
227
228 let req = self.sign(req);
229
230 let req = req
232 .extension(Operation::List)
233 .body(Buffer::new())
234 .map_err(new_request_build_error)?;
235
236 let resp = self.send(req).await?;
237
238 let status = resp.status();
239
240 if status != StatusCode::OK {
241 return Err(parse_error(resp));
242 }
243
244 let body = resp.into_body();
245
246 let resp: ListResponse =
247 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
248
249 Ok(resp)
250 }
251
252 pub async fn vercel_delete_blob(&self, path: &str) -> Result<()> {
253 let p = build_abs_path(&self.root, path);
254
255 let resp = self.list(&p, Some(1)).await?;
256
257 let url = resolve_blob(resp.blobs, p);
258
259 if url.is_empty() {
260 return Ok(());
261 }
262
263 let req = Request::post("https://blob.vercel-storage.com/delete");
264
265 let req = self.sign(req);
266
267 let req_body = &json!({
268 "urls": vec![url]
269 });
270
271 let req = req
273 .extension(Operation::Delete)
274 .header(header::CONTENT_TYPE, "application/json")
275 .body(Buffer::from(Bytes::from(req_body.to_string())))
276 .map_err(new_request_build_error)?;
277
278 let resp = self.send(req).await?;
279
280 let status = resp.status();
281
282 match status {
283 StatusCode::OK => Ok(()),
284 _ => Err(parse_error(resp)),
285 }
286 }
287
288 pub async fn initiate_multipart_upload(
289 &self,
290 path: &str,
291 args: &OpWrite,
292 ) -> Result<Response<Buffer>> {
293 let p = build_abs_path(&self.root, path);
294
295 let url = format!(
296 "https://blob.vercel-storage.com/mpu/{}",
297 percent_encode_path(&p)
298 );
299
300 let req = Request::post(&url);
301
302 let mut req = self.sign(req);
303
304 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
305 req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
306
307 if let Some(mime) = args.content_type() {
308 req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
309 };
310
311 let req = req
313 .extension(Operation::Write)
314 .body(Buffer::new())
315 .map_err(new_request_build_error)?;
316
317 self.send(req).await
318 }
319
320 pub async fn upload_part(
321 &self,
322 path: &str,
323 upload_id: &str,
324 part_number: usize,
325 size: u64,
326 body: Buffer,
327 ) -> Result<Response<Buffer>> {
328 let p = build_abs_path(&self.root, path);
329
330 let url = format!(
331 "https://blob.vercel-storage.com/mpu/{}",
332 percent_encode_path(&p)
333 );
334
335 let mut req = Request::post(&url);
336
337 req = req.header(header::CONTENT_LENGTH, size);
338 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
339 req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
340 req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
341 req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
342
343 let req = self.sign(req);
344
345 let req = req
347 .extension(Operation::Write)
348 .body(body)
349 .map_err(new_request_build_error)?;
350
351 self.send(req).await
352 }
353
354 pub async fn complete_multipart_upload(
355 &self,
356 path: &str,
357 upload_id: &str,
358 parts: Vec<Part>,
359 ) -> Result<Response<Buffer>> {
360 let p = build_abs_path(&self.root, path);
361
362 let url = format!(
363 "https://blob.vercel-storage.com/mpu/{}",
364 percent_encode_path(&p)
365 );
366
367 let mut req = Request::post(&url);
368
369 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
370 req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
371 req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
372
373 let req = self.sign(req);
374
375 let parts_json = json!(parts);
376
377 let req = req
378 .header(header::CONTENT_TYPE, "application/json")
379 .extension(Operation::Write)
380 .body(Buffer::from(Bytes::from(parts_json.to_string())))
381 .map_err(new_request_build_error)?;
382
383 self.send(req).await
384 }
385}
386
387pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
388 let mode = if blob.pathname.ends_with('/') {
389 EntryMode::DIR
390 } else {
391 EntryMode::FILE
392 };
393 let mut md = Metadata::new(mode);
394 if let Some(content_type) = blob.content_type.clone() {
395 md.set_content_type(&content_type);
396 }
397 md.set_content_length(blob.size);
398 md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?);
399 md.set_content_disposition(&blob.content_disposition);
400 Ok(md)
401}
402
403fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
404 for blob in blobs {
405 if blob.pathname == path {
406 return blob.url;
407 }
408 }
409 "".to_string()
410}
411
412#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
413#[serde(rename_all = "camelCase")]
414pub struct ListResponse {
415 pub cursor: Option<String>,
416 pub has_more: bool,
417 pub blobs: Vec<Blob>,
418}
419
420#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
421#[serde(rename_all = "camelCase")]
422pub struct Blob {
423 pub url: String,
424 pub pathname: String,
425 pub size: u64,
426 pub uploaded_at: String,
427 pub content_disposition: String,
428 pub content_type: Option<String>,
429}
430
431#[derive(Default, Debug, Clone, PartialEq, Serialize)]
432#[serde(rename_all = "camelCase")]
433pub struct Part {
434 pub part_number: usize,
435 pub etag: String,
436}
437
438#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
439#[serde(rename_all = "camelCase")]
440pub struct InitiateMultipartUploadResponse {
441 pub upload_id: String,
442 pub key: String,
443}
444
445#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
446#[serde(rename_all = "camelCase")]
447pub struct UploadPartResponse {
448 pub etag: String,
449}