opendal/services/b2/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use chrono::DateTime;
25use chrono::Utc;
26use http::header;
27use http::Request;
28use http::Response;
29use http::StatusCode;
30use serde::Deserialize;
31use serde::Serialize;
32use tokio::sync::RwLock;
33
34use self::constants::X_BZ_CONTENT_SHA1;
35use self::constants::X_BZ_FILE_NAME;
36use super::core::constants::X_BZ_PART_NUMBER;
37use super::error::parse_error;
38use crate::raw::*;
39use crate::*;
40
41pub(super) mod constants {
42    pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
43    pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
44    pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
45}
46
47/// Core of [b2](https://www.backblaze.com/cloud-storage) services support.
48#[derive(Clone)]
49pub struct B2Core {
50    pub info: Arc<AccessorInfo>,
51    pub signer: Arc<RwLock<B2Signer>>,
52
53    /// The root of this core.
54    pub root: String,
55    /// The bucket name of this backend.
56    pub bucket: String,
57    /// The bucket id of this backend.
58    pub bucket_id: String,
59}
60
61impl Debug for B2Core {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("Backend")
64            .field("root", &self.root)
65            .field("bucket", &self.bucket)
66            .field("bucket_id", &self.bucket_id)
67            .finish_non_exhaustive()
68    }
69}
70
71impl B2Core {
72    #[inline]
73    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
74        self.info.http_client().send(req).await
75    }
76
77    /// [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account)
78    pub async fn get_auth_info(&self) -> Result<AuthInfo> {
79        {
80            let signer = self.signer.read().await;
81
82            if !signer.auth_info.authorization_token.is_empty()
83                && signer.auth_info.expires_in > Utc::now()
84            {
85                let auth_info = signer.auth_info.clone();
86                return Ok(auth_info);
87            }
88        }
89
90        {
91            let mut signer = self.signer.write().await;
92            let req = Request::get("https://api.backblazeb2.com/b2api/v2/b2_authorize_account")
93                .header(
94                    header::AUTHORIZATION,
95                    format_authorization_by_basic(
96                        &signer.application_key_id,
97                        &signer.application_key,
98                    )?,
99                )
100                .body(Buffer::new())
101                .map_err(new_request_build_error)?;
102
103            let resp = self.info.http_client().send(req).await?;
104            let status = resp.status();
105
106            match status {
107                StatusCode::OK => {
108                    let resp_body = resp.into_body();
109                    let token: AuthorizeAccountResponse =
110                        serde_json::from_reader(resp_body.reader())
111                            .map_err(new_json_deserialize_error)?;
112                    signer.auth_info = AuthInfo {
113                        authorization_token: token.authorization_token.clone(),
114                        api_url: token.api_url.clone(),
115                        download_url: token.download_url.clone(),
116                        // This authorization token is valid for at most 24 hours.
117                        expires_in: Utc::now()
118                            + chrono::TimeDelta::try_hours(20).expect("20 hours must be valid"),
119                    };
120                }
121                _ => {
122                    return Err(parse_error(resp));
123                }
124            }
125            Ok(signer.auth_info.clone())
126        }
127    }
128}
129
130impl B2Core {
131    pub async fn download_file_by_name(
132        &self,
133        path: &str,
134        range: BytesRange,
135        _args: &OpRead,
136    ) -> Result<Response<HttpBody>> {
137        let path = build_abs_path(&self.root, path);
138
139        let auth_info = self.get_auth_info().await?;
140
141        // Construct headers to add to the request
142        let url = format!(
143            "{}/file/{}/{}",
144            auth_info.download_url,
145            self.bucket,
146            percent_encode_path(&path)
147        );
148
149        let mut req = Request::get(&url);
150
151        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
152
153        if !range.is_full() {
154            req = req.header(header::RANGE, range.to_header());
155        }
156
157        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
158
159        self.info.http_client().fetch(req).await
160    }
161
162    pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
163        let auth_info = self.get_auth_info().await?;
164
165        let url = format!(
166            "{}/b2api/v2/b2_get_upload_url?bucketId={}",
167            auth_info.api_url, self.bucket_id
168        );
169
170        let mut req = Request::get(&url);
171
172        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
173
174        // Set body
175        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
176
177        let resp = self.send(req).await?;
178        let status = resp.status();
179        match status {
180            StatusCode::OK => {
181                let resp_body = resp.into_body();
182                let resp = serde_json::from_reader(resp_body.reader())
183                    .map_err(new_json_deserialize_error)?;
184                Ok(resp)
185            }
186            _ => Err(parse_error(resp)),
187        }
188    }
189
190    pub async fn get_download_authorization(
191        &self,
192        path: &str,
193        expire: Duration,
194    ) -> Result<GetDownloadAuthorizationResponse> {
195        let path = build_abs_path(&self.root, path);
196
197        let auth_info = self.get_auth_info().await?;
198
199        // Construct headers to add to the request
200        let url = format!(
201            "{}/b2api/v2/b2_get_download_authorization",
202            auth_info.api_url
203        );
204        let mut req = Request::post(&url);
205
206        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
207
208        let body = GetDownloadAuthorizationRequest {
209            bucket_id: self.bucket_id.clone(),
210            file_name_prefix: path,
211            valid_duration_in_seconds: expire.as_secs(),
212        };
213        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
214        let body = bytes::Bytes::from(body);
215
216        let req = req
217            .body(Buffer::from(body))
218            .map_err(new_request_build_error)?;
219
220        let resp = self.send(req).await?;
221
222        let status = resp.status();
223        match status {
224            StatusCode::OK => {
225                let resp_body = resp.into_body();
226                let resp = serde_json::from_reader(resp_body.reader())
227                    .map_err(new_json_deserialize_error)?;
228                Ok(resp)
229            }
230            _ => Err(parse_error(resp)),
231        }
232    }
233
234    pub async fn upload_file(
235        &self,
236        path: &str,
237        size: Option<u64>,
238        args: &OpWrite,
239        body: Buffer,
240    ) -> Result<Response<Buffer>> {
241        let resp = self.get_upload_url().await?;
242
243        let p = build_abs_path(&self.root, path);
244
245        let mut req = Request::post(resp.upload_url);
246
247        req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
248
249        req = req.header(header::AUTHORIZATION, resp.authorization_token);
250
251        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
252
253        if let Some(size) = size {
254            req = req.header(header::CONTENT_LENGTH, size.to_string())
255        }
256
257        if let Some(mime) = args.content_type() {
258            req = req.header(header::CONTENT_TYPE, mime)
259        } else {
260            req = req.header(header::CONTENT_TYPE, "b2/x-auto")
261        }
262
263        if let Some(pos) = args.content_disposition() {
264            req = req.header(header::CONTENT_DISPOSITION, pos)
265        }
266
267        // Set body
268        let req = req.body(body).map_err(new_request_build_error)?;
269
270        self.send(req).await
271    }
272
273    pub async fn start_large_file(&self, path: &str, args: &OpWrite) -> Result<Response<Buffer>> {
274        let p = build_abs_path(&self.root, path);
275
276        let auth_info = self.get_auth_info().await?;
277
278        let url = format!("{}/b2api/v2/b2_start_large_file", auth_info.api_url);
279
280        let mut req = Request::post(&url);
281
282        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
283
284        let mut start_large_file_request = StartLargeFileRequest {
285            bucket_id: self.bucket_id.clone(),
286            file_name: percent_encode_path(&p),
287            content_type: "b2/x-auto".to_owned(),
288        };
289
290        if let Some(mime) = args.content_type() {
291            mime.clone_into(&mut start_large_file_request.content_type)
292        }
293
294        let body =
295            serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
296        let body = bytes::Bytes::from(body);
297
298        let req = req
299            .body(Buffer::from(body))
300            .map_err(new_request_build_error)?;
301
302        self.send(req).await
303    }
304
305    pub async fn get_upload_part_url(&self, file_id: &str) -> Result<GetUploadPartUrlResponse> {
306        let auth_info = self.get_auth_info().await?;
307
308        let url = format!(
309            "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
310            auth_info.api_url, file_id
311        );
312
313        let mut req = Request::get(&url);
314
315        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
316
317        // Set body
318        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
319
320        let resp = self.send(req).await?;
321
322        let status = resp.status();
323        match status {
324            StatusCode::OK => {
325                let resp_body = resp.into_body();
326                let resp = serde_json::from_reader(resp_body.reader())
327                    .map_err(new_json_deserialize_error)?;
328                Ok(resp)
329            }
330            _ => Err(parse_error(resp)),
331        }
332    }
333
334    pub async fn upload_part(
335        &self,
336        file_id: &str,
337        part_number: usize,
338        size: u64,
339        body: Buffer,
340    ) -> Result<Response<Buffer>> {
341        let resp = self.get_upload_part_url(file_id).await?;
342
343        let mut req = Request::post(resp.upload_url);
344
345        req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
346
347        req = req.header(header::CONTENT_LENGTH, size.to_string());
348
349        req = req.header(header::AUTHORIZATION, resp.authorization_token);
350
351        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
352
353        // Set body
354        let req = req.body(body).map_err(new_request_build_error)?;
355
356        self.send(req).await
357    }
358
359    pub async fn finish_large_file(
360        &self,
361        file_id: &str,
362        part_sha1_array: Vec<String>,
363    ) -> Result<Response<Buffer>> {
364        let auth_info = self.get_auth_info().await?;
365
366        let url = format!("{}/b2api/v2/b2_finish_large_file", auth_info.api_url);
367
368        let mut req = Request::post(&url);
369
370        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
371
372        let body = serde_json::to_vec(&FinishLargeFileRequest {
373            file_id: file_id.to_owned(),
374            part_sha1_array,
375        })
376        .map_err(new_json_serialize_error)?;
377        let body = bytes::Bytes::from(body);
378
379        // Set body
380        let req = req
381            .body(Buffer::from(body))
382            .map_err(new_request_build_error)?;
383
384        self.send(req).await
385    }
386
387    pub async fn cancel_large_file(&self, file_id: &str) -> Result<Response<Buffer>> {
388        let auth_info = self.get_auth_info().await?;
389
390        let url = format!("{}/b2api/v2/b2_cancel_large_file", auth_info.api_url);
391
392        let mut req = Request::post(&url);
393
394        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
395
396        let body = serde_json::to_vec(&CancelLargeFileRequest {
397            file_id: file_id.to_owned(),
398        })
399        .map_err(new_json_serialize_error)?;
400        let body = bytes::Bytes::from(body);
401
402        // Set body
403        let req = req
404            .body(Buffer::from(body))
405            .map_err(new_request_build_error)?;
406
407        self.send(req).await
408    }
409
410    pub async fn list_file_names(
411        &self,
412        prefix: Option<&str>,
413        delimiter: Option<&str>,
414        limit: Option<usize>,
415        start_after: Option<String>,
416    ) -> Result<Response<Buffer>> {
417        let auth_info = self.get_auth_info().await?;
418
419        let mut url = format!(
420            "{}/b2api/v2/b2_list_file_names?bucketId={}",
421            auth_info.api_url, self.bucket_id
422        );
423
424        if let Some(prefix) = prefix {
425            let prefix = build_abs_path(&self.root, prefix);
426            url.push_str(&format!("&prefix={}", percent_encode_path(&prefix)));
427        }
428
429        if let Some(limit) = limit {
430            url.push_str(&format!("&maxFileCount={}", limit));
431        }
432
433        if let Some(start_after) = start_after {
434            url.push_str(&format!(
435                "&startFileName={}",
436                percent_encode_path(&start_after)
437            ));
438        }
439
440        if let Some(delimiter) = delimiter {
441            url.push_str(&format!("&delimiter={}", delimiter));
442        }
443
444        let mut req = Request::get(&url);
445
446        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
447
448        // Set body
449        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
450
451        self.send(req).await
452    }
453
454    pub async fn copy_file(&self, source_file_id: String, to: &str) -> Result<Response<Buffer>> {
455        let to = build_abs_path(&self.root, to);
456
457        let auth_info = self.get_auth_info().await?;
458
459        let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
460
461        let mut req = Request::post(url);
462
463        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
464
465        let body = CopyFileRequest {
466            source_file_id,
467            file_name: to,
468        };
469
470        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
471        let body = bytes::Bytes::from(body);
472
473        // Set body
474        let req = req
475            .body(Buffer::from(body))
476            .map_err(new_request_build_error)?;
477
478        self.send(req).await
479    }
480
481    pub async fn hide_file(&self, path: &str) -> Result<Response<Buffer>> {
482        let path = build_abs_path(&self.root, path);
483
484        let auth_info = self.get_auth_info().await?;
485
486        let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
487
488        let mut req = Request::post(url);
489
490        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
491
492        let body = HideFileRequest {
493            bucket_id: self.bucket_id.clone(),
494            file_name: path.to_string(),
495        };
496
497        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
498        let body = bytes::Bytes::from(body);
499
500        // Set body
501        let req = req
502            .body(Buffer::from(body))
503            .map_err(new_request_build_error)?;
504
505        self.send(req).await
506    }
507}
508
509#[derive(Clone)]
510pub struct B2Signer {
511    /// The application_key_id of this core.
512    pub application_key_id: String,
513    /// The application_key of this core.
514    pub application_key: String,
515
516    pub auth_info: AuthInfo,
517}
518
519#[derive(Clone)]
520pub struct AuthInfo {
521    pub authorization_token: String,
522    /// The base URL to use for all API calls except for uploading and downloading files.
523    pub api_url: String,
524    /// The base URL to use for downloading files.
525    pub download_url: String,
526
527    pub expires_in: DateTime<Utc>,
528}
529
530impl Default for B2Signer {
531    fn default() -> Self {
532        B2Signer {
533            application_key: String::new(),
534            application_key_id: String::new(),
535
536            auth_info: AuthInfo {
537                authorization_token: String::new(),
538                api_url: String::new(),
539                download_url: String::new(),
540                expires_in: DateTime::<Utc>::MIN_UTC,
541            },
542        }
543    }
544}
545
546/// Request of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
547#[derive(Debug, Serialize)]
548#[serde(rename_all = "camelCase")]
549pub struct StartLargeFileRequest {
550    pub bucket_id: String,
551    pub file_name: String,
552    pub content_type: String,
553}
554
555/// Response of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
556#[derive(Debug, Deserialize)]
557#[serde(rename_all = "camelCase")]
558pub struct StartLargeFileResponse {
559    pub file_id: String,
560}
561
562/// Response of [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account).
563#[derive(Debug, Deserialize)]
564#[serde(rename_all = "camelCase")]
565pub struct AuthorizeAccountResponse {
566    /// An authorization token to use with all calls, other than b2_authorize_account, that need an Authorization header. This authorization token is valid for at most 24 hours.
567    /// So we should call b2_authorize_account every 24 hours.
568    pub authorization_token: String,
569    pub api_url: String,
570    pub download_url: String,
571}
572
573/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-url).
574#[derive(Debug, Deserialize)]
575#[serde(rename_all = "camelCase")]
576pub struct GetUploadUrlResponse {
577    /// The authorizationToken that must be used when uploading files to this bucket.
578    /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file
579    pub authorization_token: String,
580    pub upload_url: String,
581}
582
583/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-part-url).
584#[derive(Debug, Deserialize)]
585#[serde(rename_all = "camelCase")]
586pub struct GetUploadPartUrlResponse {
587    /// The authorizationToken that must be used when uploading files to this bucket.
588    /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file
589    pub authorization_token: String,
590    pub upload_url: String,
591}
592
593/// Response of [b2_upload_part](https://www.backblaze.com/apidocs/b2-upload-part).
594#[derive(Debug, Deserialize)]
595#[serde(rename_all = "camelCase")]
596pub struct UploadPartResponse {
597    pub content_sha1: String,
598}
599
600/// Response of [b2_finish_large_file](https://www.backblaze.com/apidocs/b2-finish-large-file).
601#[derive(Debug, Serialize)]
602#[serde(rename_all = "camelCase")]
603pub struct FinishLargeFileRequest {
604    pub file_id: String,
605    pub part_sha1_array: Vec<String>,
606}
607
608/// Response of [b2_cancel_large_file](https://www.backblaze.com/apidocs/b2-cancel-large-file).
609#[derive(Debug, Serialize)]
610#[serde(rename_all = "camelCase")]
611pub struct CancelLargeFileRequest {
612    pub file_id: String,
613}
614
615/// Response of [list_file_names](https://www.backblaze.com/apidocs/b2-list-file-names).
616#[derive(Debug, Clone, Deserialize)]
617#[serde(rename_all = "camelCase")]
618pub struct ListFileNamesResponse {
619    pub files: Vec<File>,
620    pub next_file_name: Option<String>,
621}
622
623/// Response of [b2-finish-large-file](https://www.backblaze.com/apidocs/b2-finish-large-file).
624#[derive(Debug, Clone, Deserialize)]
625#[serde(rename_all = "camelCase")]
626pub struct UploadResponse {
627    pub content_length: u64,
628    pub content_md5: Option<String>,
629    pub content_type: Option<String>,
630}
631
632#[derive(Debug, Clone, Deserialize)]
633#[serde(rename_all = "camelCase")]
634pub struct File {
635    pub file_id: Option<String>,
636    pub content_length: u64,
637    pub content_md5: Option<String>,
638    pub content_type: Option<String>,
639    pub file_name: String,
640}
641
642pub(super) fn parse_file_info(file: &File) -> Metadata {
643    if file.file_name.ends_with('/') {
644        return Metadata::new(EntryMode::DIR);
645    }
646
647    let mut metadata = Metadata::new(EntryMode::FILE);
648
649    metadata.set_content_length(file.content_length);
650
651    if let Some(content_md5) = &file.content_md5 {
652        metadata.set_content_md5(content_md5);
653    }
654
655    if let Some(content_type) = &file.content_type {
656        metadata.set_content_type(content_type);
657    }
658
659    metadata
660}
661
662#[derive(Debug, Serialize)]
663#[serde(rename_all = "camelCase")]
664pub struct CopyFileRequest {
665    pub source_file_id: String,
666    pub file_name: String,
667}
668
669#[derive(Debug, Serialize)]
670#[serde(rename_all = "camelCase")]
671pub struct HideFileRequest {
672    pub bucket_id: String,
673    pub file_name: String,
674}
675
676#[derive(Debug, Serialize)]
677#[serde(rename_all = "camelCase")]
678pub struct GetDownloadAuthorizationRequest {
679    pub bucket_id: String,
680    pub file_name_prefix: String,
681    pub valid_duration_in_seconds: u64,
682}
683
684#[derive(Debug, Deserialize)]
685#[serde(rename_all = "camelCase")]
686pub struct GetDownloadAuthorizationResponse {
687    pub authorization_token: String,
688}