opendal_core/services/vercel_blob/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use http::header;
27use http::request;
28use serde::Deserialize;
29use serde::Serialize;
30use serde_json::json;
31
32use self::constants::*;
33use super::error::parse_error;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38 pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
41 pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
45 pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
52 pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
53 pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
54 pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
55}
56
57#[derive(Clone)]
58pub struct VercelBlobCore {
59 pub info: Arc<AccessorInfo>,
60 pub root: String,
62 pub token: String,
64}
65
66impl Debug for VercelBlobCore {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("VercelBlobCore")
69 .field("root", &self.root)
70 .finish_non_exhaustive()
71 }
72}
73
74impl VercelBlobCore {
75 #[inline]
76 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
77 self.info.http_client().send(req).await
78 }
79
80 pub fn sign(&self, req: request::Builder) -> request::Builder {
81 req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
82 }
83}
84
85impl VercelBlobCore {
86 pub async fn download(
87 &self,
88 path: &str,
89 range: BytesRange,
90 _: &OpRead,
91 ) -> Result<Response<HttpBody>> {
92 let p = build_abs_path(&self.root, path);
93 let resp = self.list(&p, Some(1)).await?;
96
97 let url = resolve_blob(resp.blobs, p);
99
100 if url.is_empty() {
101 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
102 }
103
104 let mut req = Request::get(url);
105
106 if !range.is_full() {
107 req = req.header(header::RANGE, range.to_header());
108 }
109
110 let req = req
112 .extension(Operation::Read)
113 .body(Buffer::new())
114 .map_err(new_request_build_error)?;
115
116 self.info.http_client().fetch(req).await
117 }
118
119 pub async fn upload(
120 &self,
121 path: &str,
122 size: u64,
123 args: &OpWrite,
124 body: Buffer,
125 ) -> Result<Response<Buffer>> {
126 let p = build_abs_path(&self.root, path);
127
128 let url = format!(
129 "https://blob.vercel-storage.com/{}",
130 percent_encode_path(&p)
131 );
132
133 let mut req = Request::put(&url);
134
135 req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
136
137 req = req.header(header::CONTENT_LENGTH, size.to_string());
138
139 if let Some(mime) = args.content_type() {
140 req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
141 }
142
143 let req = self.sign(req);
144
145 let req = req
147 .extension(Operation::Write)
148 .body(body)
149 .map_err(new_request_build_error)?;
150
151 self.send(req).await
152 }
153
154 pub async fn head(&self, path: &str) -> Result<Response<Buffer>> {
155 let p = build_abs_path(&self.root, path);
156
157 let resp = self.list(&p, Some(1)).await?;
158
159 let url = resolve_blob(resp.blobs, p);
160
161 if url.is_empty() {
162 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
163 }
164
165 let req = Request::get(format!(
166 "https://blob.vercel-storage.com?url={}",
167 percent_encode_path(&url)
168 ));
169
170 let req = self.sign(req);
171
172 let req = req
174 .extension(Operation::Stat)
175 .body(Buffer::new())
176 .map_err(new_request_build_error)?;
177
178 self.send(req).await
179 }
180
181 pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
182 let from = build_abs_path(&self.root, from);
183
184 let resp = self.list(&from, Some(1)).await?;
185
186 let from_url = resolve_blob(resp.blobs, from);
187
188 if from_url.is_empty() {
189 return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
190 }
191
192 let to = build_abs_path(&self.root, to);
193
194 let to_url = format!(
195 "https://blob.vercel-storage.com/{}?fromUrl={}",
196 percent_encode_path(&to),
197 percent_encode_path(&from_url),
198 );
199
200 let req = Request::put(&to_url);
201
202 let req = self.sign(req);
203
204 let req = req
206 .extension(Operation::Copy)
207 .body(Buffer::new())
208 .map_err(new_request_build_error)?;
209
210 self.send(req).await
211 }
212
213 pub async fn list(&self, prefix: &str, limit: Option<usize>) -> Result<ListResponse> {
214 let prefix = if prefix == "/" { "" } else { prefix };
215
216 let mut url = format!(
217 "https://blob.vercel-storage.com?prefix={}",
218 percent_encode_path(prefix)
219 );
220
221 if let Some(limit) = limit {
222 url.push_str(&format!("&limit={limit}"))
223 }
224
225 let req = Request::get(&url);
226
227 let req = self.sign(req);
228
229 let req = req
231 .extension(Operation::List)
232 .body(Buffer::new())
233 .map_err(new_request_build_error)?;
234
235 let resp = self.send(req).await?;
236
237 let status = resp.status();
238
239 if status != StatusCode::OK {
240 return Err(parse_error(resp));
241 }
242
243 let body = resp.into_body();
244
245 let resp: ListResponse =
246 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
247
248 Ok(resp)
249 }
250
251 pub async fn vercel_delete_blob(&self, path: &str) -> Result<()> {
252 let p = build_abs_path(&self.root, path);
253
254 let resp = self.list(&p, Some(1)).await?;
255
256 let url = resolve_blob(resp.blobs, p);
257
258 if url.is_empty() {
259 return Ok(());
260 }
261
262 let req = Request::post("https://blob.vercel-storage.com/delete");
263
264 let req = self.sign(req);
265
266 let req_body = &json!({
267 "urls": vec![url]
268 });
269
270 let req = req
272 .extension(Operation::Delete)
273 .header(header::CONTENT_TYPE, "application/json")
274 .body(Buffer::from(Bytes::from(req_body.to_string())))
275 .map_err(new_request_build_error)?;
276
277 let resp = self.send(req).await?;
278
279 let status = resp.status();
280
281 match status {
282 StatusCode::OK => Ok(()),
283 _ => Err(parse_error(resp)),
284 }
285 }
286
287 pub async fn initiate_multipart_upload(
288 &self,
289 path: &str,
290 args: &OpWrite,
291 ) -> Result<Response<Buffer>> {
292 let p = build_abs_path(&self.root, path);
293
294 let url = format!(
295 "https://blob.vercel-storage.com/mpu/{}",
296 percent_encode_path(&p)
297 );
298
299 let req = Request::post(&url);
300
301 let mut req = self.sign(req);
302
303 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
304 req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
305
306 if let Some(mime) = args.content_type() {
307 req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
308 };
309
310 let req = req
312 .extension(Operation::Write)
313 .body(Buffer::new())
314 .map_err(new_request_build_error)?;
315
316 self.send(req).await
317 }
318
319 pub async fn upload_part(
320 &self,
321 path: &str,
322 upload_id: &str,
323 part_number: usize,
324 size: u64,
325 body: Buffer,
326 ) -> Result<Response<Buffer>> {
327 let p = build_abs_path(&self.root, path);
328
329 let url = format!(
330 "https://blob.vercel-storage.com/mpu/{}",
331 percent_encode_path(&p)
332 );
333
334 let mut req = Request::post(&url);
335
336 req = req.header(header::CONTENT_LENGTH, size);
337 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
338 req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
339 req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
340 req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
341
342 let req = self.sign(req);
343
344 let req = req
346 .extension(Operation::Write)
347 .body(body)
348 .map_err(new_request_build_error)?;
349
350 self.send(req).await
351 }
352
353 pub async fn complete_multipart_upload(
354 &self,
355 path: &str,
356 upload_id: &str,
357 parts: Vec<Part>,
358 ) -> Result<Response<Buffer>> {
359 let p = build_abs_path(&self.root, path);
360
361 let url = format!(
362 "https://blob.vercel-storage.com/mpu/{}",
363 percent_encode_path(&p)
364 );
365
366 let mut req = Request::post(&url);
367
368 req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
369 req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
370 req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
371
372 let req = self.sign(req);
373
374 let parts_json = json!(parts);
375
376 let req = req
377 .header(header::CONTENT_TYPE, "application/json")
378 .extension(Operation::Write)
379 .body(Buffer::from(Bytes::from(parts_json.to_string())))
380 .map_err(new_request_build_error)?;
381
382 self.send(req).await
383 }
384}
385
386pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
387 let mode = if blob.pathname.ends_with('/') {
388 EntryMode::DIR
389 } else {
390 EntryMode::FILE
391 };
392 let mut md = Metadata::new(mode);
393 if let Some(content_type) = blob.content_type.clone() {
394 md.set_content_type(&content_type);
395 }
396 md.set_content_length(blob.size);
397 md.set_last_modified(blob.uploaded_at.parse::<Timestamp>()?);
398 md.set_content_disposition(&blob.content_disposition);
399 Ok(md)
400}
401
402fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
403 for blob in blobs {
404 if blob.pathname == path {
405 return blob.url;
406 }
407 }
408 "".to_string()
409}
410
411#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
412#[serde(rename_all = "camelCase")]
413pub struct ListResponse {
414 pub cursor: Option<String>,
415 pub has_more: bool,
416 pub blobs: Vec<Blob>,
417}
418
419#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
420#[serde(rename_all = "camelCase")]
421pub struct Blob {
422 pub url: String,
423 pub pathname: String,
424 pub size: u64,
425 pub uploaded_at: String,
426 pub content_disposition: String,
427 pub content_type: Option<String>,
428}
429
430#[derive(Default, Debug, Clone, PartialEq, Serialize)]
431#[serde(rename_all = "camelCase")]
432pub struct Part {
433 pub part_number: usize,
434 pub etag: String,
435}
436
437#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
438#[serde(rename_all = "camelCase")]
439pub struct InitiateMultipartUploadResponse {
440 pub upload_id: String,
441 pub key: String,
442}
443
444#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
445#[serde(rename_all = "camelCase")]
446pub struct UploadPartResponse {
447 pub etag: String,
448}