opendal/services/vercel_blob/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::Bytes;
24use http::header;
25use http::request;
26use http::Request;
27use http::Response;
28use http::StatusCode;
29use serde::Deserialize;
30use serde::Serialize;
31use serde_json::json;
32
33use self::constants::*;
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38pub(super) mod constants {
39    // https://github.com/vercel/storage/blob/main/packages/blob/src/put.ts#L16
40    // x-content-type specifies the MIME type of the file being uploaded.
41    pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
42    // x-add-random-suffix specifying whether to  add a random suffix to the pathname
43    // Default value is 1, which means to add a random suffix.
44    // Set it to 0 to disable the random suffix.
45    pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
46    // https://github.com/vercel/storage/blob/main/packages/blob/src/put-multipart.ts#L84
47    // x-mpu-action specifies the action to perform on the MPU.
48    // Possible values are:
49    // - create: create a new MPU.
50    // - upload: upload a part to an existing MPU.
51    // - complete: complete an existing MPU.
52    pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
53    pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
54    pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
55    pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
56}
57
58#[derive(Clone)]
59pub struct VercelBlobCore {
60    pub info: Arc<AccessorInfo>,
61    /// The root of this core.
62    pub root: String,
63    /// Vercel Blob token.
64    pub token: String,
65}
66
67impl Debug for VercelBlobCore {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("Backend")
70            .field("root", &self.root)
71            .finish_non_exhaustive()
72    }
73}
74
75impl VercelBlobCore {
76    #[inline]
77    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
78        self.info.http_client().send(req).await
79    }
80
81    pub fn sign(&self, req: request::Builder) -> request::Builder {
82        req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
83    }
84}
85
86impl VercelBlobCore {
87    pub async fn download(
88        &self,
89        path: &str,
90        range: BytesRange,
91        _: &OpRead,
92    ) -> Result<Response<HttpBody>> {
93        let p = build_abs_path(&self.root, path);
94        // Vercel blob use an unguessable random id url to download the file
95        // So we use list to get the url of the file and then use it to download the file
96        let resp = self.list(&p, Some(1)).await?;
97
98        // Use the mtach url to download the file
99        let url = resolve_blob(resp.blobs, p);
100
101        if url.is_empty() {
102            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
103        }
104
105        let mut req = Request::get(url);
106
107        if !range.is_full() {
108            req = req.header(header::RANGE, range.to_header());
109        }
110
111        // Set body
112        let req = req
113            .extension(Operation::Read)
114            .body(Buffer::new())
115            .map_err(new_request_build_error)?;
116
117        self.info.http_client().fetch(req).await
118    }
119
120    pub async fn upload(
121        &self,
122        path: &str,
123        size: u64,
124        args: &OpWrite,
125        body: Buffer,
126    ) -> Result<Response<Buffer>> {
127        let p = build_abs_path(&self.root, path);
128
129        let url = format!(
130            "https://blob.vercel-storage.com/{}",
131            percent_encode_path(&p)
132        );
133
134        let mut req = Request::put(&url);
135
136        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
137
138        req = req.header(header::CONTENT_LENGTH, size.to_string());
139
140        if let Some(mime) = args.content_type() {
141            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
142        }
143
144        let req = self.sign(req);
145
146        // Set body
147        let req = req
148            .extension(Operation::Write)
149            .body(body)
150            .map_err(new_request_build_error)?;
151
152        self.send(req).await
153    }
154
155    pub async fn head(&self, path: &str) -> Result<Response<Buffer>> {
156        let p = build_abs_path(&self.root, path);
157
158        let resp = self.list(&p, Some(1)).await?;
159
160        let url = resolve_blob(resp.blobs, p);
161
162        if url.is_empty() {
163            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
164        }
165
166        let req = Request::get(format!(
167            "https://blob.vercel-storage.com?url={}",
168            percent_encode_path(&url)
169        ));
170
171        let req = self.sign(req);
172
173        // Set body
174        let req = req
175            .extension(Operation::Stat)
176            .body(Buffer::new())
177            .map_err(new_request_build_error)?;
178
179        self.send(req).await
180    }
181
182    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
183        let from = build_abs_path(&self.root, from);
184
185        let resp = self.list(&from, Some(1)).await?;
186
187        let from_url = resolve_blob(resp.blobs, from);
188
189        if from_url.is_empty() {
190            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
191        }
192
193        let to = build_abs_path(&self.root, to);
194
195        let to_url = format!(
196            "https://blob.vercel-storage.com/{}?fromUrl={}",
197            percent_encode_path(&to),
198            percent_encode_path(&from_url),
199        );
200
201        let req = Request::put(&to_url);
202
203        let req = self.sign(req);
204
205        // Set body
206        let req = req
207            .extension(Operation::Copy)
208            .body(Buffer::new())
209            .map_err(new_request_build_error)?;
210
211        self.send(req).await
212    }
213
214    pub async fn list(&self, prefix: &str, limit: Option<usize>) -> Result<ListResponse> {
215        let prefix = if prefix == "/" { "" } else { prefix };
216
217        let mut url = format!(
218            "https://blob.vercel-storage.com?prefix={}",
219            percent_encode_path(prefix)
220        );
221
222        if let Some(limit) = limit {
223            url.push_str(&format!("&limit={}", limit))
224        }
225
226        let req = Request::get(&url);
227
228        let req = self.sign(req);
229
230        // Set body
231        let req = req
232            .extension(Operation::List)
233            .body(Buffer::new())
234            .map_err(new_request_build_error)?;
235
236        let resp = self.send(req).await?;
237
238        let status = resp.status();
239
240        if status != StatusCode::OK {
241            return Err(parse_error(resp));
242        }
243
244        let body = resp.into_body();
245
246        let resp: ListResponse =
247            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
248
249        Ok(resp)
250    }
251
252    pub async fn initiate_multipart_upload(
253        &self,
254        path: &str,
255        args: &OpWrite,
256    ) -> Result<Response<Buffer>> {
257        let p = build_abs_path(&self.root, path);
258
259        let url = format!(
260            "https://blob.vercel-storage.com/mpu/{}",
261            percent_encode_path(&p)
262        );
263
264        let req = Request::post(&url);
265
266        let mut req = self.sign(req);
267
268        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
269        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
270
271        if let Some(mime) = args.content_type() {
272            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
273        };
274
275        // Set body
276        let req = req
277            .extension(Operation::Write)
278            .body(Buffer::new())
279            .map_err(new_request_build_error)?;
280
281        self.send(req).await
282    }
283
284    pub async fn upload_part(
285        &self,
286        path: &str,
287        upload_id: &str,
288        part_number: usize,
289        size: u64,
290        body: Buffer,
291    ) -> Result<Response<Buffer>> {
292        let p = build_abs_path(&self.root, path);
293
294        let url = format!(
295            "https://blob.vercel-storage.com/mpu/{}",
296            percent_encode_path(&p)
297        );
298
299        let mut req = Request::post(&url);
300
301        req = req.header(header::CONTENT_LENGTH, size);
302        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
303        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
304        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
305        req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
306
307        let req = self.sign(req);
308
309        // Set body
310        let req = req
311            .extension(Operation::Write)
312            .body(body)
313            .map_err(new_request_build_error)?;
314
315        self.send(req).await
316    }
317
318    pub async fn complete_multipart_upload(
319        &self,
320        path: &str,
321        upload_id: &str,
322        parts: Vec<Part>,
323    ) -> Result<Response<Buffer>> {
324        let p = build_abs_path(&self.root, path);
325
326        let url = format!(
327            "https://blob.vercel-storage.com/mpu/{}",
328            percent_encode_path(&p)
329        );
330
331        let mut req = Request::post(&url);
332
333        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
334        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
335        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
336
337        let req = self.sign(req);
338
339        let parts_json = json!(parts);
340
341        let req = req
342            .header(header::CONTENT_TYPE, "application/json")
343            .extension(Operation::Write)
344            .body(Buffer::from(Bytes::from(parts_json.to_string())))
345            .map_err(new_request_build_error)?;
346
347        self.send(req).await
348    }
349}
350
351pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
352    let mode = if blob.pathname.ends_with('/') {
353        EntryMode::DIR
354    } else {
355        EntryMode::FILE
356    };
357    let mut md = Metadata::new(mode);
358    if let Some(content_type) = blob.content_type.clone() {
359        md.set_content_type(&content_type);
360    }
361    md.set_content_length(blob.size);
362    md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?);
363    md.set_content_disposition(&blob.content_disposition);
364    Ok(md)
365}
366
367fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
368    for blob in blobs {
369        if blob.pathname == path {
370            return blob.url;
371        }
372    }
373    "".to_string()
374}
375
376#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
377#[serde(rename_all = "camelCase")]
378pub struct ListResponse {
379    pub cursor: Option<String>,
380    pub has_more: bool,
381    pub blobs: Vec<Blob>,
382}
383
384#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
385#[serde(rename_all = "camelCase")]
386pub struct Blob {
387    pub url: String,
388    pub pathname: String,
389    pub size: u64,
390    pub uploaded_at: String,
391    pub content_disposition: String,
392    pub content_type: Option<String>,
393}
394
395#[derive(Default, Debug, Clone, PartialEq, Serialize)]
396#[serde(rename_all = "camelCase")]
397pub struct Part {
398    pub part_number: usize,
399    pub etag: String,
400}
401
402#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
403#[serde(rename_all = "camelCase")]
404pub struct InitiateMultipartUploadResponse {
405    pub upload_id: String,
406    pub key: String,
407}
408
409#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
410#[serde(rename_all = "camelCase")]
411pub struct UploadPartResponse {
412    pub etag: String,
413}