opendal/services/vercel_blob/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Buf;
19use bytes::Bytes;
20use http::header;
21use http::request;
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use serde::Deserialize;
26use serde::Serialize;
27use serde_json::json;
28use std::fmt::Debug;
29use std::fmt::Formatter;
30use std::sync::Arc;
31
32use self::constants::*;
33use super::error::parse_error;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38    // https://github.com/vercel/storage/blob/main/packages/blob/src/put.ts#L16
39    // x-content-type specifies the MIME type of the file being uploaded.
40    pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
41    // x-add-random-suffix specifying whether to  add a random suffix to the pathname
42    // Default value is 1, which means to add a random suffix.
43    // Set it to 0 to disable the random suffix.
44    pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
45    // https://github.com/vercel/storage/blob/main/packages/blob/src/put-multipart.ts#L84
46    // x-mpu-action specifies the action to perform on the MPU.
47    // Possible values are:
48    // - create: create a new MPU.
49    // - upload: upload a part to an existing MPU.
50    // - complete: complete an existing MPU.
51    pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
52    pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
53    pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
54    pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
55}
56
57#[derive(Clone)]
58pub struct VercelBlobCore {
59    pub info: Arc<AccessorInfo>,
60    /// The root of this core.
61    pub root: String,
62    /// Vercel Blob token.
63    pub token: String,
64}
65
66impl Debug for VercelBlobCore {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("Backend")
69            .field("root", &self.root)
70            .finish_non_exhaustive()
71    }
72}
73
74impl VercelBlobCore {
75    #[inline]
76    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
77        self.info.http_client().send(req).await
78    }
79
80    pub fn sign(&self, req: request::Builder) -> request::Builder {
81        req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
82    }
83}
84
85impl VercelBlobCore {
86    pub async fn download(
87        &self,
88        path: &str,
89        range: BytesRange,
90        _: &OpRead,
91    ) -> Result<Response<HttpBody>> {
92        let p = build_abs_path(&self.root, path);
93        // Vercel blob use an unguessable random id url to download the file
94        // So we use list to get the url of the file and then use it to download the file
95        let resp = self.list(&p, Some(1)).await?;
96
97        // Use the mtach url to download the file
98        let url = resolve_blob(resp.blobs, p);
99
100        if url.is_empty() {
101            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
102        }
103
104        let mut req = Request::get(url);
105
106        if !range.is_full() {
107            req = req.header(header::RANGE, range.to_header());
108        }
109
110        // Set body
111        let req = req
112            .extension(Operation::Read)
113            .body(Buffer::new())
114            .map_err(new_request_build_error)?;
115
116        self.info.http_client().fetch(req).await
117    }
118
119    pub async fn upload(
120        &self,
121        path: &str,
122        size: u64,
123        args: &OpWrite,
124        body: Buffer,
125    ) -> Result<Response<Buffer>> {
126        let p = build_abs_path(&self.root, path);
127
128        let url = format!(
129            "https://blob.vercel-storage.com/{}",
130            percent_encode_path(&p)
131        );
132
133        let mut req = Request::put(&url);
134
135        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
136
137        req = req.header(header::CONTENT_LENGTH, size.to_string());
138
139        if let Some(mime) = args.content_type() {
140            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
141        }
142
143        let req = self.sign(req);
144
145        // Set body
146        let req = req
147            .extension(Operation::Write)
148            .body(body)
149            .map_err(new_request_build_error)?;
150
151        self.send(req).await
152    }
153
154    pub async fn head(&self, path: &str) -> Result<Response<Buffer>> {
155        let p = build_abs_path(&self.root, path);
156
157        let resp = self.list(&p, Some(1)).await?;
158
159        let url = resolve_blob(resp.blobs, p);
160
161        if url.is_empty() {
162            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
163        }
164
165        let req = Request::get(format!(
166            "https://blob.vercel-storage.com?url={}",
167            percent_encode_path(&url)
168        ));
169
170        let req = self.sign(req);
171
172        // Set body
173        let req = req
174            .extension(Operation::Stat)
175            .body(Buffer::new())
176            .map_err(new_request_build_error)?;
177
178        self.send(req).await
179    }
180
181    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
182        let from = build_abs_path(&self.root, from);
183
184        let resp = self.list(&from, Some(1)).await?;
185
186        let from_url = resolve_blob(resp.blobs, from);
187
188        if from_url.is_empty() {
189            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
190        }
191
192        let to = build_abs_path(&self.root, to);
193
194        let to_url = format!(
195            "https://blob.vercel-storage.com/{}?fromUrl={}",
196            percent_encode_path(&to),
197            percent_encode_path(&from_url),
198        );
199
200        let req = Request::put(&to_url);
201
202        let req = self.sign(req);
203
204        // Set body
205        let req = req
206            .extension(Operation::Copy)
207            .body(Buffer::new())
208            .map_err(new_request_build_error)?;
209
210        self.send(req).await
211    }
212
213    pub async fn list(&self, prefix: &str, limit: Option<usize>) -> Result<ListResponse> {
214        let prefix = if prefix == "/" { "" } else { prefix };
215
216        let mut url = format!(
217            "https://blob.vercel-storage.com?prefix={}",
218            percent_encode_path(prefix)
219        );
220
221        if let Some(limit) = limit {
222            url.push_str(&format!("&limit={}", limit))
223        }
224
225        let req = Request::get(&url);
226
227        let req = self.sign(req);
228
229        // Set body
230        let req = req
231            .extension(Operation::List)
232            .body(Buffer::new())
233            .map_err(new_request_build_error)?;
234
235        let resp = self.send(req).await?;
236
237        let status = resp.status();
238
239        if status != StatusCode::OK {
240            return Err(parse_error(resp));
241        }
242
243        let body = resp.into_body();
244
245        let resp: ListResponse =
246            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
247
248        Ok(resp)
249    }
250
251    pub async fn initiate_multipart_upload(
252        &self,
253        path: &str,
254        args: &OpWrite,
255    ) -> Result<Response<Buffer>> {
256        let p = build_abs_path(&self.root, path);
257
258        let url = format!(
259            "https://blob.vercel-storage.com/mpu/{}",
260            percent_encode_path(&p)
261        );
262
263        let req = Request::post(&url);
264
265        let mut req = self.sign(req);
266
267        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
268        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
269
270        if let Some(mime) = args.content_type() {
271            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
272        };
273
274        // Set body
275        let req = req
276            .extension(Operation::Write)
277            .body(Buffer::new())
278            .map_err(new_request_build_error)?;
279
280        self.send(req).await
281    }
282
283    pub async fn upload_part(
284        &self,
285        path: &str,
286        upload_id: &str,
287        part_number: usize,
288        size: u64,
289        body: Buffer,
290    ) -> Result<Response<Buffer>> {
291        let p = build_abs_path(&self.root, path);
292
293        let url = format!(
294            "https://blob.vercel-storage.com/mpu/{}",
295            percent_encode_path(&p)
296        );
297
298        let mut req = Request::post(&url);
299
300        req = req.header(header::CONTENT_LENGTH, size);
301        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
302        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
303        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
304        req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
305
306        let req = self.sign(req);
307
308        // Set body
309        let req = req
310            .extension(Operation::Write)
311            .body(body)
312            .map_err(new_request_build_error)?;
313
314        self.send(req).await
315    }
316
317    pub async fn complete_multipart_upload(
318        &self,
319        path: &str,
320        upload_id: &str,
321        parts: Vec<Part>,
322    ) -> Result<Response<Buffer>> {
323        let p = build_abs_path(&self.root, path);
324
325        let url = format!(
326            "https://blob.vercel-storage.com/mpu/{}",
327            percent_encode_path(&p)
328        );
329
330        let mut req = Request::post(&url);
331
332        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
333        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
334        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
335
336        let req = self.sign(req);
337
338        let parts_json = json!(parts);
339
340        let req = req
341            .header(header::CONTENT_TYPE, "application/json")
342            .extension(Operation::Write)
343            .body(Buffer::from(Bytes::from(parts_json.to_string())))
344            .map_err(new_request_build_error)?;
345
346        self.send(req).await
347    }
348}
349
350pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
351    let mode = if blob.pathname.ends_with('/') {
352        EntryMode::DIR
353    } else {
354        EntryMode::FILE
355    };
356    let mut md = Metadata::new(mode);
357    if let Some(content_type) = blob.content_type.clone() {
358        md.set_content_type(&content_type);
359    }
360    md.set_content_length(blob.size);
361    md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?);
362    md.set_content_disposition(&blob.content_disposition);
363    Ok(md)
364}
365
366fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
367    for blob in blobs {
368        if blob.pathname == path {
369            return blob.url;
370        }
371    }
372    "".to_string()
373}
374
375#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
376#[serde(rename_all = "camelCase")]
377pub struct ListResponse {
378    pub cursor: Option<String>,
379    pub has_more: bool,
380    pub blobs: Vec<Blob>,
381}
382
383#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
384#[serde(rename_all = "camelCase")]
385pub struct Blob {
386    pub url: String,
387    pub pathname: String,
388    pub size: u64,
389    pub uploaded_at: String,
390    pub content_disposition: String,
391    pub content_type: Option<String>,
392}
393
394#[derive(Default, Debug, Clone, PartialEq, Serialize)]
395#[serde(rename_all = "camelCase")]
396pub struct Part {
397    pub part_number: usize,
398    pub etag: String,
399}
400
401#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
402#[serde(rename_all = "camelCase")]
403pub struct InitiateMultipartUploadResponse {
404    pub upload_id: String,
405    pub key: String,
406}
407
408#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
409#[serde(rename_all = "camelCase")]
410pub struct UploadPartResponse {
411    pub etag: String,
412}