opendal/services/vercel_blob/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::Bytes;
24use http::header;
25use http::request;
26use http::Request;
27use http::Response;
28use http::StatusCode;
29use serde::Deserialize;
30use serde::Serialize;
31use serde_json::json;
32
33use self::constants::*;
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38pub(super) mod constants {
39    // https://github.com/vercel/storage/blob/main/packages/blob/src/put.ts#L16
40    // x-content-type specifies the MIME type of the file being uploaded.
41    pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
42    // x-add-random-suffix specifying whether to  add a random suffix to the pathname
43    // Default value is 1, which means to add a random suffix.
44    // Set it to 0 to disable the random suffix.
45    pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
46    // https://github.com/vercel/storage/blob/main/packages/blob/src/put-multipart.ts#L84
47    // x-mpu-action specifies the action to perform on the MPU.
48    // Possible values are:
49    // - create: create a new MPU.
50    // - upload: upload a part to an existing MPU.
51    // - complete: complete an existing MPU.
52    pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
53    pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
54    pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
55    pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
56}
57
58#[derive(Clone)]
59pub struct VercelBlobCore {
60    pub info: Arc<AccessorInfo>,
61    /// The root of this core.
62    pub root: String,
63    /// Vercel Blob token.
64    pub token: String,
65}
66
67impl Debug for VercelBlobCore {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("Backend")
70            .field("root", &self.root)
71            .finish_non_exhaustive()
72    }
73}
74
75impl VercelBlobCore {
76    #[inline]
77    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
78        self.info.http_client().send(req).await
79    }
80
81    pub fn sign(&self, req: request::Builder) -> request::Builder {
82        req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
83    }
84}
85
86impl VercelBlobCore {
87    pub async fn download(
88        &self,
89        path: &str,
90        range: BytesRange,
91        _: &OpRead,
92    ) -> Result<Response<HttpBody>> {
93        let p = build_abs_path(&self.root, path);
94        // Vercel blob use an unguessable random id url to download the file
95        // So we use list to get the url of the file and then use it to download the file
96        let resp = self.list(&p, Some(1)).await?;
97
98        // Use the mtach url to download the file
99        let url = resolve_blob(resp.blobs, p);
100
101        if url.is_empty() {
102            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
103        }
104
105        let mut req = Request::get(url);
106
107        if !range.is_full() {
108            req = req.header(header::RANGE, range.to_header());
109        }
110
111        // Set body
112        let req = req
113            .extension(Operation::Read)
114            .body(Buffer::new())
115            .map_err(new_request_build_error)?;
116
117        self.info.http_client().fetch(req).await
118    }
119
120    pub async fn upload(
121        &self,
122        path: &str,
123        size: u64,
124        args: &OpWrite,
125        body: Buffer,
126    ) -> Result<Response<Buffer>> {
127        let p = build_abs_path(&self.root, path);
128
129        let url = format!(
130            "https://blob.vercel-storage.com/{}",
131            percent_encode_path(&p)
132        );
133
134        let mut req = Request::put(&url);
135
136        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
137
138        req = req.header(header::CONTENT_LENGTH, size.to_string());
139
140        if let Some(mime) = args.content_type() {
141            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
142        }
143
144        let req = self.sign(req);
145
146        // Set body
147        let req = req
148            .extension(Operation::Write)
149            .body(body)
150            .map_err(new_request_build_error)?;
151
152        self.send(req).await
153    }
154
155    pub async fn head(&self, path: &str) -> Result<Response<Buffer>> {
156        let p = build_abs_path(&self.root, path);
157
158        let resp = self.list(&p, Some(1)).await?;
159
160        let url = resolve_blob(resp.blobs, p);
161
162        if url.is_empty() {
163            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
164        }
165
166        let req = Request::get(format!(
167            "https://blob.vercel-storage.com?url={}",
168            percent_encode_path(&url)
169        ));
170
171        let req = self.sign(req);
172
173        // Set body
174        let req = req
175            .extension(Operation::Stat)
176            .body(Buffer::new())
177            .map_err(new_request_build_error)?;
178
179        self.send(req).await
180    }
181
182    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
183        let from = build_abs_path(&self.root, from);
184
185        let resp = self.list(&from, Some(1)).await?;
186
187        let from_url = resolve_blob(resp.blobs, from);
188
189        if from_url.is_empty() {
190            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
191        }
192
193        let to = build_abs_path(&self.root, to);
194
195        let to_url = format!(
196            "https://blob.vercel-storage.com/{}?fromUrl={}",
197            percent_encode_path(&to),
198            percent_encode_path(&from_url),
199        );
200
201        let req = Request::put(&to_url);
202
203        let req = self.sign(req);
204
205        // Set body
206        let req = req
207            .extension(Operation::Copy)
208            .body(Buffer::new())
209            .map_err(new_request_build_error)?;
210
211        self.send(req).await
212    }
213
214    pub async fn list(&self, prefix: &str, limit: Option<usize>) -> Result<ListResponse> {
215        let prefix = if prefix == "/" { "" } else { prefix };
216
217        let mut url = format!(
218            "https://blob.vercel-storage.com?prefix={}",
219            percent_encode_path(prefix)
220        );
221
222        if let Some(limit) = limit {
223            url.push_str(&format!("&limit={limit}"))
224        }
225
226        let req = Request::get(&url);
227
228        let req = self.sign(req);
229
230        // Set body
231        let req = req
232            .extension(Operation::List)
233            .body(Buffer::new())
234            .map_err(new_request_build_error)?;
235
236        let resp = self.send(req).await?;
237
238        let status = resp.status();
239
240        if status != StatusCode::OK {
241            return Err(parse_error(resp));
242        }
243
244        let body = resp.into_body();
245
246        let resp: ListResponse =
247            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
248
249        Ok(resp)
250    }
251
252    pub async fn vercel_delete_blob(&self, path: &str) -> Result<()> {
253        let p = build_abs_path(&self.root, path);
254
255        let resp = self.list(&p, Some(1)).await?;
256
257        let url = resolve_blob(resp.blobs, p);
258
259        if url.is_empty() {
260            return Ok(());
261        }
262
263        let req = Request::post("https://blob.vercel-storage.com/delete");
264
265        let req = self.sign(req);
266
267        let req_body = &json!({
268            "urls": vec![url]
269        });
270
271        // Set body
272        let req = req
273            .extension(Operation::Delete)
274            .header(header::CONTENT_TYPE, "application/json")
275            .body(Buffer::from(Bytes::from(req_body.to_string())))
276            .map_err(new_request_build_error)?;
277
278        let resp = self.send(req).await?;
279
280        let status = resp.status();
281
282        match status {
283            StatusCode::OK => Ok(()),
284            _ => Err(parse_error(resp)),
285        }
286    }
287
288    pub async fn initiate_multipart_upload(
289        &self,
290        path: &str,
291        args: &OpWrite,
292    ) -> Result<Response<Buffer>> {
293        let p = build_abs_path(&self.root, path);
294
295        let url = format!(
296            "https://blob.vercel-storage.com/mpu/{}",
297            percent_encode_path(&p)
298        );
299
300        let req = Request::post(&url);
301
302        let mut req = self.sign(req);
303
304        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
305        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
306
307        if let Some(mime) = args.content_type() {
308            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
309        };
310
311        // Set body
312        let req = req
313            .extension(Operation::Write)
314            .body(Buffer::new())
315            .map_err(new_request_build_error)?;
316
317        self.send(req).await
318    }
319
320    pub async fn upload_part(
321        &self,
322        path: &str,
323        upload_id: &str,
324        part_number: usize,
325        size: u64,
326        body: Buffer,
327    ) -> Result<Response<Buffer>> {
328        let p = build_abs_path(&self.root, path);
329
330        let url = format!(
331            "https://blob.vercel-storage.com/mpu/{}",
332            percent_encode_path(&p)
333        );
334
335        let mut req = Request::post(&url);
336
337        req = req.header(header::CONTENT_LENGTH, size);
338        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
339        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
340        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
341        req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
342
343        let req = self.sign(req);
344
345        // Set body
346        let req = req
347            .extension(Operation::Write)
348            .body(body)
349            .map_err(new_request_build_error)?;
350
351        self.send(req).await
352    }
353
354    pub async fn complete_multipart_upload(
355        &self,
356        path: &str,
357        upload_id: &str,
358        parts: Vec<Part>,
359    ) -> Result<Response<Buffer>> {
360        let p = build_abs_path(&self.root, path);
361
362        let url = format!(
363            "https://blob.vercel-storage.com/mpu/{}",
364            percent_encode_path(&p)
365        );
366
367        let mut req = Request::post(&url);
368
369        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
370        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
371        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
372
373        let req = self.sign(req);
374
375        let parts_json = json!(parts);
376
377        let req = req
378            .header(header::CONTENT_TYPE, "application/json")
379            .extension(Operation::Write)
380            .body(Buffer::from(Bytes::from(parts_json.to_string())))
381            .map_err(new_request_build_error)?;
382
383        self.send(req).await
384    }
385}
386
387pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
388    let mode = if blob.pathname.ends_with('/') {
389        EntryMode::DIR
390    } else {
391        EntryMode::FILE
392    };
393    let mut md = Metadata::new(mode);
394    if let Some(content_type) = blob.content_type.clone() {
395        md.set_content_type(&content_type);
396    }
397    md.set_content_length(blob.size);
398    md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?);
399    md.set_content_disposition(&blob.content_disposition);
400    Ok(md)
401}
402
403fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
404    for blob in blobs {
405        if blob.pathname == path {
406            return blob.url;
407        }
408    }
409    "".to_string()
410}
411
412#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
413#[serde(rename_all = "camelCase")]
414pub struct ListResponse {
415    pub cursor: Option<String>,
416    pub has_more: bool,
417    pub blobs: Vec<Blob>,
418}
419
420#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
421#[serde(rename_all = "camelCase")]
422pub struct Blob {
423    pub url: String,
424    pub pathname: String,
425    pub size: u64,
426    pub uploaded_at: String,
427    pub content_disposition: String,
428    pub content_type: Option<String>,
429}
430
431#[derive(Default, Debug, Clone, PartialEq, Serialize)]
432#[serde(rename_all = "camelCase")]
433pub struct Part {
434    pub part_number: usize,
435    pub etag: String,
436}
437
438#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
439#[serde(rename_all = "camelCase")]
440pub struct InitiateMultipartUploadResponse {
441    pub upload_id: String,
442    pub key: String,
443}
444
445#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
446#[serde(rename_all = "camelCase")]
447pub struct UploadPartResponse {
448    pub etag: String,
449}