opendal/services/b2/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use chrono::DateTime;
25use chrono::Utc;
26use http::header;
27use http::Request;
28use http::Response;
29use http::StatusCode;
30use serde::Deserialize;
31use serde::Serialize;
32use tokio::sync::RwLock;
33
34use self::constants::X_BZ_CONTENT_SHA1;
35use self::constants::X_BZ_FILE_NAME;
36use super::core::constants::X_BZ_PART_NUMBER;
37use super::error::parse_error;
38use crate::raw::*;
39use crate::*;
40
41pub(super) mod constants {
42    pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
43    pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
44    pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
45}
46
47/// Core of [b2](https://www.backblaze.com/cloud-storage) services support.
48#[derive(Clone)]
49pub struct B2Core {
50    pub info: Arc<AccessorInfo>,
51    pub signer: Arc<RwLock<B2Signer>>,
52
53    /// The root of this core.
54    pub root: String,
55    /// The bucket name of this backend.
56    pub bucket: String,
57    /// The bucket id of this backend.
58    pub bucket_id: String,
59}
60
61impl Debug for B2Core {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("Backend")
64            .field("root", &self.root)
65            .field("bucket", &self.bucket)
66            .field("bucket_id", &self.bucket_id)
67            .finish_non_exhaustive()
68    }
69}
70
71impl B2Core {
72    #[inline]
73    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
74        self.info.http_client().send(req).await
75    }
76
77    /// [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account)
78    pub async fn get_auth_info(&self) -> Result<AuthInfo> {
79        {
80            let signer = self.signer.read().await;
81
82            if !signer.auth_info.authorization_token.is_empty()
83                && signer.auth_info.expires_in > Utc::now()
84            {
85                let auth_info = signer.auth_info.clone();
86                return Ok(auth_info);
87            }
88        }
89
90        {
91            let mut signer = self.signer.write().await;
92            let req = Request::get("https://api.backblazeb2.com/b2api/v2/b2_authorize_account")
93                .header(
94                    header::AUTHORIZATION,
95                    format_authorization_by_basic(
96                        &signer.application_key_id,
97                        &signer.application_key,
98                    )?,
99                )
100                .body(Buffer::new())
101                .map_err(new_request_build_error)?;
102
103            let resp = self.info.http_client().send(req).await?;
104            let status = resp.status();
105
106            match status {
107                StatusCode::OK => {
108                    let resp_body = resp.into_body();
109                    let token: AuthorizeAccountResponse =
110                        serde_json::from_reader(resp_body.reader())
111                            .map_err(new_json_deserialize_error)?;
112                    signer.auth_info = AuthInfo {
113                        authorization_token: token.authorization_token.clone(),
114                        api_url: token.api_url.clone(),
115                        download_url: token.download_url.clone(),
116                        // This authorization token is valid for at most 24 hours.
117                        expires_in: Utc::now()
118                            + chrono::TimeDelta::try_hours(20).expect("20 hours must be valid"),
119                    };
120                }
121                _ => {
122                    return Err(parse_error(resp));
123                }
124            }
125            Ok(signer.auth_info.clone())
126        }
127    }
128}
129
130impl B2Core {
131    pub async fn download_file_by_name(
132        &self,
133        path: &str,
134        range: BytesRange,
135        _args: &OpRead,
136    ) -> Result<Response<HttpBody>> {
137        let path = build_abs_path(&self.root, path);
138
139        let auth_info = self.get_auth_info().await?;
140
141        // Construct headers to add to the request
142        let url = format!(
143            "{}/file/{}/{}",
144            auth_info.download_url,
145            self.bucket,
146            percent_encode_path(&path)
147        );
148
149        let mut req = Request::get(&url);
150
151        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
152
153        if !range.is_full() {
154            req = req.header(header::RANGE, range.to_header());
155        }
156
157        let req = req.extension(Operation::Read);
158
159        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
160
161        self.info.http_client().fetch(req).await
162    }
163
164    pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
165        let auth_info = self.get_auth_info().await?;
166
167        let url = format!(
168            "{}/b2api/v2/b2_get_upload_url?bucketId={}",
169            auth_info.api_url, self.bucket_id
170        );
171
172        let mut req = Request::get(&url);
173
174        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
175
176        let req = req.extension(Operation::Write);
177
178        // Set body
179        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
180
181        let resp = self.send(req).await?;
182        let status = resp.status();
183        match status {
184            StatusCode::OK => {
185                let resp_body = resp.into_body();
186                let resp = serde_json::from_reader(resp_body.reader())
187                    .map_err(new_json_deserialize_error)?;
188                Ok(resp)
189            }
190            _ => Err(parse_error(resp)),
191        }
192    }
193
194    pub async fn get_download_authorization(
195        &self,
196        path: &str,
197        expire: Duration,
198    ) -> Result<GetDownloadAuthorizationResponse> {
199        let path = build_abs_path(&self.root, path);
200
201        let auth_info = self.get_auth_info().await?;
202
203        // Construct headers to add to the request
204        let url = format!(
205            "{}/b2api/v2/b2_get_download_authorization",
206            auth_info.api_url
207        );
208        let mut req = Request::post(&url);
209
210        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
211
212        let body = GetDownloadAuthorizationRequest {
213            bucket_id: self.bucket_id.clone(),
214            file_name_prefix: path,
215            valid_duration_in_seconds: expire.as_secs(),
216        };
217        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
218        let body = bytes::Bytes::from(body);
219
220        let req = req
221            .body(Buffer::from(body))
222            .map_err(new_request_build_error)?;
223
224        let resp = self.send(req).await?;
225
226        let status = resp.status();
227        match status {
228            StatusCode::OK => {
229                let resp_body = resp.into_body();
230                let resp = serde_json::from_reader(resp_body.reader())
231                    .map_err(new_json_deserialize_error)?;
232                Ok(resp)
233            }
234            _ => Err(parse_error(resp)),
235        }
236    }
237
238    pub async fn upload_file(
239        &self,
240        path: &str,
241        size: Option<u64>,
242        args: &OpWrite,
243        body: Buffer,
244    ) -> Result<Response<Buffer>> {
245        let resp = self.get_upload_url().await?;
246
247        let p = build_abs_path(&self.root, path);
248
249        let mut req = Request::post(resp.upload_url);
250
251        req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
252
253        req = req.header(header::AUTHORIZATION, resp.authorization_token);
254
255        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
256
257        if let Some(size) = size {
258            req = req.header(header::CONTENT_LENGTH, size.to_string())
259        }
260
261        if let Some(mime) = args.content_type() {
262            req = req.header(header::CONTENT_TYPE, mime)
263        } else {
264            req = req.header(header::CONTENT_TYPE, "b2/x-auto")
265        }
266
267        if let Some(pos) = args.content_disposition() {
268            req = req.header(header::CONTENT_DISPOSITION, pos)
269        }
270
271        let req = req.extension(Operation::Write);
272
273        // Set body
274        let req = req.body(body).map_err(new_request_build_error)?;
275
276        self.send(req).await
277    }
278
279    pub async fn start_large_file(&self, path: &str, args: &OpWrite) -> Result<Response<Buffer>> {
280        let p = build_abs_path(&self.root, path);
281
282        let auth_info = self.get_auth_info().await?;
283
284        let url = format!("{}/b2api/v2/b2_start_large_file", auth_info.api_url);
285
286        let mut req = Request::post(&url);
287
288        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
289
290        let mut start_large_file_request = StartLargeFileRequest {
291            bucket_id: self.bucket_id.clone(),
292            file_name: percent_encode_path(&p),
293            content_type: "b2/x-auto".to_owned(),
294        };
295
296        if let Some(mime) = args.content_type() {
297            mime.clone_into(&mut start_large_file_request.content_type)
298        }
299
300        let req = req.extension(Operation::Write);
301
302        let body =
303            serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
304        let body = bytes::Bytes::from(body);
305
306        let req = req
307            .body(Buffer::from(body))
308            .map_err(new_request_build_error)?;
309
310        self.send(req).await
311    }
312
313    pub async fn get_upload_part_url(&self, file_id: &str) -> Result<GetUploadPartUrlResponse> {
314        let auth_info = self.get_auth_info().await?;
315
316        let url = format!(
317            "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
318            auth_info.api_url, file_id
319        );
320
321        let mut req = Request::get(&url);
322
323        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
324
325        let req = req.extension(Operation::Write);
326
327        // Set body
328        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
329
330        let resp = self.send(req).await?;
331
332        let status = resp.status();
333        match status {
334            StatusCode::OK => {
335                let resp_body = resp.into_body();
336                let resp = serde_json::from_reader(resp_body.reader())
337                    .map_err(new_json_deserialize_error)?;
338                Ok(resp)
339            }
340            _ => Err(parse_error(resp)),
341        }
342    }
343
344    pub async fn upload_part(
345        &self,
346        file_id: &str,
347        part_number: usize,
348        size: u64,
349        body: Buffer,
350    ) -> Result<Response<Buffer>> {
351        let resp = self.get_upload_part_url(file_id).await?;
352
353        let mut req = Request::post(resp.upload_url);
354
355        req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
356
357        req = req.header(header::CONTENT_LENGTH, size.to_string());
358
359        req = req.header(header::AUTHORIZATION, resp.authorization_token);
360
361        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
362
363        let req = req.extension(Operation::Write);
364
365        // Set body
366        let req = req.body(body).map_err(new_request_build_error)?;
367
368        self.send(req).await
369    }
370
371    pub async fn finish_large_file(
372        &self,
373        file_id: &str,
374        part_sha1_array: Vec<String>,
375    ) -> Result<Response<Buffer>> {
376        let auth_info = self.get_auth_info().await?;
377
378        let url = format!("{}/b2api/v2/b2_finish_large_file", auth_info.api_url);
379
380        let mut req = Request::post(&url);
381
382        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
383
384        let req = req.extension(Operation::Write);
385
386        let body = serde_json::to_vec(&FinishLargeFileRequest {
387            file_id: file_id.to_owned(),
388            part_sha1_array,
389        })
390        .map_err(new_json_serialize_error)?;
391        let body = bytes::Bytes::from(body);
392
393        // Set body
394        let req = req
395            .body(Buffer::from(body))
396            .map_err(new_request_build_error)?;
397
398        self.send(req).await
399    }
400
401    pub async fn cancel_large_file(&self, file_id: &str) -> Result<Response<Buffer>> {
402        let auth_info = self.get_auth_info().await?;
403
404        let url = format!("{}/b2api/v2/b2_cancel_large_file", auth_info.api_url);
405
406        let mut req = Request::post(&url);
407
408        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
409
410        let req = req.extension(Operation::Write);
411
412        let body = serde_json::to_vec(&CancelLargeFileRequest {
413            file_id: file_id.to_owned(),
414        })
415        .map_err(new_json_serialize_error)?;
416        let body = bytes::Bytes::from(body);
417
418        // Set body
419        let req = req
420            .body(Buffer::from(body))
421            .map_err(new_request_build_error)?;
422
423        self.send(req).await
424    }
425
426    pub async fn get_file_info(&self, path: &str, delimiter: Option<&str>) -> Result<File> {
427        let resp = self
428            .list_file_names_raw(Some(path), delimiter, None, None, Operation::Stat)
429            .await?;
430
431        let status = resp.status();
432        match status {
433            StatusCode::OK => {
434                let bs = resp.into_body();
435                let mut resp: ListFileNamesResponse =
436                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
437
438                if resp.files.is_empty() {
439                    return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
440                }
441                Ok(resp.files.swap_remove(0))
442            }
443            _ => Err(parse_error(resp)),
444        }
445    }
446
447    pub async fn list_file_names(
448        &self,
449        prefix: Option<&str>,
450        delimiter: Option<&str>,
451        limit: Option<usize>,
452        start_after: Option<String>,
453    ) -> Result<Response<Buffer>> {
454        self.list_file_names_raw(prefix, delimiter, limit, start_after, Operation::List)
455            .await
456    }
457
458    async fn list_file_names_raw(
459        &self,
460        prefix: Option<&str>,
461        delimiter: Option<&str>,
462        limit: Option<usize>,
463        start_after: Option<String>,
464        operation: Operation,
465    ) -> Result<Response<Buffer>> {
466        let auth_info = self.get_auth_info().await?;
467
468        let url = format!("{}/b2api/v2/b2_list_file_names", auth_info.api_url);
469
470        let mut url = QueryPairsWriter::new(&url);
471        url = url.push("bucketId", &self.bucket_id);
472
473        if let Some(prefix) = prefix {
474            let prefix = build_abs_path(&self.root, prefix);
475            if !prefix.is_empty() {
476                url = url.push("prefix", &percent_encode_path(&prefix));
477            }
478        }
479
480        if let Some(limit) = limit {
481            url = url.push("maxFileCount", &limit.to_string());
482        }
483
484        if let Some(start_after) = start_after {
485            url = url.push("startFileName", &percent_encode_path(&start_after));
486        }
487
488        if let Some(delimiter) = delimiter {
489            url = url.push("delimiter", delimiter);
490        }
491
492        let mut req = Request::get(url.finish());
493
494        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
495
496        req = req.extension(operation);
497
498        // Set body
499        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
500
501        self.send(req).await
502    }
503
504    pub async fn copy_file(&self, source_file_id: String, to: &str) -> Result<Response<Buffer>> {
505        let to = build_abs_path(&self.root, to);
506
507        let auth_info = self.get_auth_info().await?;
508
509        let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
510
511        let mut req = Request::post(url);
512
513        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
514
515        let req = req.extension(Operation::Copy);
516
517        let body = CopyFileRequest {
518            source_file_id,
519            file_name: to,
520        };
521
522        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
523        let body = bytes::Bytes::from(body);
524
525        // Set body
526        let req = req
527            .body(Buffer::from(body))
528            .map_err(new_request_build_error)?;
529
530        self.send(req).await
531    }
532
533    pub async fn hide_file(&self, path: &str) -> Result<Response<Buffer>> {
534        let path = build_abs_path(&self.root, path);
535
536        let auth_info = self.get_auth_info().await?;
537
538        let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
539
540        let mut req = Request::post(url);
541
542        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
543
544        let req = req.extension(Operation::Delete);
545
546        let body = HideFileRequest {
547            bucket_id: self.bucket_id.clone(),
548            file_name: path.to_string(),
549        };
550
551        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
552        let body = bytes::Bytes::from(body);
553
554        // Set body
555        let req = req
556            .body(Buffer::from(body))
557            .map_err(new_request_build_error)?;
558
559        self.send(req).await
560    }
561}
562
563#[derive(Clone)]
564pub struct B2Signer {
565    /// The application_key_id of this core.
566    pub application_key_id: String,
567    /// The application_key of this core.
568    pub application_key: String,
569
570    pub auth_info: AuthInfo,
571}
572
573#[derive(Clone)]
574pub struct AuthInfo {
575    pub authorization_token: String,
576    /// The base URL to use for all API calls except for uploading and downloading files.
577    pub api_url: String,
578    /// The base URL to use for downloading files.
579    pub download_url: String,
580
581    pub expires_in: DateTime<Utc>,
582}
583
584impl Default for B2Signer {
585    fn default() -> Self {
586        B2Signer {
587            application_key: String::new(),
588            application_key_id: String::new(),
589
590            auth_info: AuthInfo {
591                authorization_token: String::new(),
592                api_url: String::new(),
593                download_url: String::new(),
594                expires_in: DateTime::<Utc>::MIN_UTC,
595            },
596        }
597    }
598}
599
600/// Request of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
601#[derive(Debug, Serialize)]
602#[serde(rename_all = "camelCase")]
603pub struct StartLargeFileRequest {
604    pub bucket_id: String,
605    pub file_name: String,
606    pub content_type: String,
607}
608
609/// Response of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
610#[derive(Debug, Deserialize)]
611#[serde(rename_all = "camelCase")]
612pub struct StartLargeFileResponse {
613    pub file_id: String,
614}
615
616/// Response of [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account).
617#[derive(Debug, Deserialize)]
618#[serde(rename_all = "camelCase")]
619pub struct AuthorizeAccountResponse {
620    /// An authorization token to use with all calls, other than b2_authorize_account, that need an Authorization header. This authorization token is valid for at most 24 hours.
621    /// So we should call b2_authorize_account every 24 hours.
622    pub authorization_token: String,
623    pub api_url: String,
624    pub download_url: String,
625}
626
627/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-url).
628#[derive(Debug, Deserialize)]
629#[serde(rename_all = "camelCase")]
630pub struct GetUploadUrlResponse {
631    /// The authorizationToken that must be used when uploading files to this bucket.
632    /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file
633    pub authorization_token: String,
634    pub upload_url: String,
635}
636
637/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-part-url).
638#[derive(Debug, Deserialize)]
639#[serde(rename_all = "camelCase")]
640pub struct GetUploadPartUrlResponse {
641    /// The authorizationToken that must be used when uploading files to this bucket.
642    /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file
643    pub authorization_token: String,
644    pub upload_url: String,
645}
646
647/// Response of [b2_upload_part](https://www.backblaze.com/apidocs/b2-upload-part).
648#[derive(Debug, Deserialize)]
649#[serde(rename_all = "camelCase")]
650pub struct UploadPartResponse {
651    pub content_sha1: String,
652}
653
654/// Response of [b2_finish_large_file](https://www.backblaze.com/apidocs/b2-finish-large-file).
655#[derive(Debug, Serialize)]
656#[serde(rename_all = "camelCase")]
657pub struct FinishLargeFileRequest {
658    pub file_id: String,
659    pub part_sha1_array: Vec<String>,
660}
661
662/// Response of [b2_cancel_large_file](https://www.backblaze.com/apidocs/b2-cancel-large-file).
663#[derive(Debug, Serialize)]
664#[serde(rename_all = "camelCase")]
665pub struct CancelLargeFileRequest {
666    pub file_id: String,
667}
668
669/// Response of [list_file_names](https://www.backblaze.com/apidocs/b2-list-file-names).
670#[derive(Debug, Clone, Deserialize)]
671#[serde(rename_all = "camelCase")]
672pub struct ListFileNamesResponse {
673    pub files: Vec<File>,
674    pub next_file_name: Option<String>,
675}
676
677/// Response of [b2-finish-large-file](https://www.backblaze.com/apidocs/b2-finish-large-file).
678#[derive(Debug, Clone, Deserialize)]
679#[serde(rename_all = "camelCase")]
680pub struct UploadResponse {
681    pub content_length: u64,
682    pub content_md5: Option<String>,
683    pub content_type: Option<String>,
684}
685
686#[derive(Debug, Clone, Deserialize)]
687#[serde(rename_all = "camelCase")]
688pub struct File {
689    pub file_id: Option<String>,
690    pub content_length: u64,
691    pub content_md5: Option<String>,
692    pub content_type: Option<String>,
693    pub file_name: String,
694}
695
696pub(super) fn parse_file_info(file: &File) -> Metadata {
697    if file.file_name.ends_with('/') {
698        return Metadata::new(EntryMode::DIR);
699    }
700
701    let mut metadata = Metadata::new(EntryMode::FILE);
702
703    metadata.set_content_length(file.content_length);
704
705    if let Some(content_md5) = &file.content_md5 {
706        metadata.set_content_md5(content_md5);
707    }
708
709    if let Some(content_type) = &file.content_type {
710        metadata.set_content_type(content_type);
711    }
712
713    metadata
714}
715
716#[derive(Debug, Serialize)]
717#[serde(rename_all = "camelCase")]
718pub struct CopyFileRequest {
719    pub source_file_id: String,
720    pub file_name: String,
721}
722
723#[derive(Debug, Serialize)]
724#[serde(rename_all = "camelCase")]
725pub struct HideFileRequest {
726    pub bucket_id: String,
727    pub file_name: String,
728}
729
730#[derive(Debug, Serialize)]
731#[serde(rename_all = "camelCase")]
732pub struct GetDownloadAuthorizationRequest {
733    pub bucket_id: String,
734    pub file_name_prefix: String,
735    pub valid_duration_in_seconds: u64,
736}
737
738#[derive(Debug, Deserialize)]
739#[serde(rename_all = "camelCase")]
740pub struct GetDownloadAuthorizationResponse {
741    pub authorization_token: String,
742}