opendal_core/services/b2/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use http::header;
26use mea::rwlock::RwLock;
27use serde::Deserialize;
28use serde::Serialize;
29
30use self::constants::X_BZ_CONTENT_SHA1;
31use self::constants::X_BZ_FILE_NAME;
32use super::core::constants::X_BZ_PART_NUMBER;
33use super::error::parse_error;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38    pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
39    pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
40    pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
41}
42
43/// Core of [b2](https://www.backblaze.com/cloud-storage) services support.
44#[derive(Clone)]
45pub struct B2Core {
46    pub info: Arc<AccessorInfo>,
47    pub signer: Arc<RwLock<B2Signer>>,
48
49    /// The root of this core.
50    pub root: String,
51    /// The bucket name of this backend.
52    pub bucket: String,
53    /// The bucket id of this backend.
54    pub bucket_id: String,
55}
56
57impl Debug for B2Core {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("B2Core")
60            .field("root", &self.root)
61            .field("bucket", &self.bucket)
62            .field("bucket_id", &self.bucket_id)
63            .finish_non_exhaustive()
64    }
65}
66
67impl B2Core {
68    #[inline]
69    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
70        self.info.http_client().send(req).await
71    }
72
73    /// [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account)
74    pub async fn get_auth_info(&self) -> Result<AuthInfo> {
75        {
76            let signer = self.signer.read().await;
77
78            if !signer.auth_info.authorization_token.is_empty()
79                && signer.auth_info.expires_in > Timestamp::now()
80            {
81                let auth_info = signer.auth_info.clone();
82                return Ok(auth_info);
83            }
84        }
85
86        {
87            let mut signer = self.signer.write().await;
88            let req = Request::get("https://api.backblazeb2.com/b2api/v2/b2_authorize_account")
89                .header(
90                    header::AUTHORIZATION,
91                    format_authorization_by_basic(
92                        &signer.application_key_id,
93                        &signer.application_key,
94                    )?,
95                )
96                .body(Buffer::new())
97                .map_err(new_request_build_error)?;
98
99            let resp = self.info.http_client().send(req).await?;
100            let status = resp.status();
101
102            match status {
103                StatusCode::OK => {
104                    let resp_body = resp.into_body();
105                    let token: AuthorizeAccountResponse =
106                        serde_json::from_reader(resp_body.reader())
107                            .map_err(new_json_deserialize_error)?;
108                    signer.auth_info = AuthInfo {
109                        authorization_token: token.authorization_token.clone(),
110                        api_url: token.api_url.clone(),
111                        download_url: token.download_url.clone(),
112                        // This authorization token is valid for at most 24 hours.
113                        expires_in: Timestamp::now() + Duration::from_secs(20 * 60 * 60),
114                    };
115                }
116                _ => {
117                    return Err(parse_error(resp));
118                }
119            }
120            Ok(signer.auth_info.clone())
121        }
122    }
123}
124
125impl B2Core {
126    pub async fn download_file_by_name(
127        &self,
128        path: &str,
129        range: BytesRange,
130        _args: &OpRead,
131    ) -> Result<Response<HttpBody>> {
132        let path = build_abs_path(&self.root, path);
133
134        let auth_info = self.get_auth_info().await?;
135
136        // Construct headers to add to the request
137        let url = format!(
138            "{}/file/{}/{}",
139            auth_info.download_url,
140            self.bucket,
141            percent_encode_path(&path)
142        );
143
144        let mut req = Request::get(&url);
145
146        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
147
148        if !range.is_full() {
149            req = req.header(header::RANGE, range.to_header());
150        }
151
152        let req = req.extension(Operation::Read);
153
154        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
155
156        self.info.http_client().fetch(req).await
157    }
158
159    pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
160        let auth_info = self.get_auth_info().await?;
161
162        let url = format!(
163            "{}/b2api/v2/b2_get_upload_url?bucketId={}",
164            auth_info.api_url, self.bucket_id
165        );
166
167        let mut req = Request::get(&url);
168
169        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
170
171        let req = req.extension(Operation::Write);
172
173        // Set body
174        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
175
176        let resp = self.send(req).await?;
177        let status = resp.status();
178        match status {
179            StatusCode::OK => {
180                let resp_body = resp.into_body();
181                let resp = serde_json::from_reader(resp_body.reader())
182                    .map_err(new_json_deserialize_error)?;
183                Ok(resp)
184            }
185            _ => Err(parse_error(resp)),
186        }
187    }
188
189    pub async fn get_download_authorization(
190        &self,
191        path: &str,
192        expire: Duration,
193    ) -> Result<GetDownloadAuthorizationResponse> {
194        let path = build_abs_path(&self.root, path);
195
196        let auth_info = self.get_auth_info().await?;
197
198        // Construct headers to add to the request
199        let url = format!(
200            "{}/b2api/v2/b2_get_download_authorization",
201            auth_info.api_url
202        );
203        let mut req = Request::post(&url);
204
205        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
206
207        let body = GetDownloadAuthorizationRequest {
208            bucket_id: self.bucket_id.clone(),
209            file_name_prefix: path,
210            valid_duration_in_seconds: expire.as_secs(),
211        };
212        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
213        let body = bytes::Bytes::from(body);
214
215        let req = req
216            .body(Buffer::from(body))
217            .map_err(new_request_build_error)?;
218
219        let resp = self.send(req).await?;
220
221        let status = resp.status();
222        match status {
223            StatusCode::OK => {
224                let resp_body = resp.into_body();
225                let resp = serde_json::from_reader(resp_body.reader())
226                    .map_err(new_json_deserialize_error)?;
227                Ok(resp)
228            }
229            _ => Err(parse_error(resp)),
230        }
231    }
232
233    pub async fn upload_file(
234        &self,
235        path: &str,
236        size: Option<u64>,
237        args: &OpWrite,
238        body: Buffer,
239    ) -> Result<Response<Buffer>> {
240        let resp = self.get_upload_url().await?;
241
242        let p = build_abs_path(&self.root, path);
243
244        let mut req = Request::post(resp.upload_url);
245
246        req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
247
248        req = req.header(header::AUTHORIZATION, resp.authorization_token);
249
250        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
251
252        if let Some(size) = size {
253            req = req.header(header::CONTENT_LENGTH, size.to_string())
254        }
255
256        if let Some(mime) = args.content_type() {
257            req = req.header(header::CONTENT_TYPE, mime)
258        } else {
259            req = req.header(header::CONTENT_TYPE, "b2/x-auto")
260        }
261
262        if let Some(pos) = args.content_disposition() {
263            req = req.header(header::CONTENT_DISPOSITION, pos)
264        }
265
266        let req = req.extension(Operation::Write);
267
268        // Set body
269        let req = req.body(body).map_err(new_request_build_error)?;
270
271        self.send(req).await
272    }
273
274    pub async fn start_large_file(&self, path: &str, args: &OpWrite) -> Result<Response<Buffer>> {
275        let p = build_abs_path(&self.root, path);
276
277        let auth_info = self.get_auth_info().await?;
278
279        let url = format!("{}/b2api/v2/b2_start_large_file", auth_info.api_url);
280
281        let mut req = Request::post(&url);
282
283        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
284
285        let mut start_large_file_request = StartLargeFileRequest {
286            bucket_id: self.bucket_id.clone(),
287            file_name: percent_encode_path(&p),
288            content_type: "b2/x-auto".to_owned(),
289        };
290
291        if let Some(mime) = args.content_type() {
292            mime.clone_into(&mut start_large_file_request.content_type)
293        }
294
295        let req = req.extension(Operation::Write);
296
297        let body =
298            serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
299        let body = bytes::Bytes::from(body);
300
301        let req = req
302            .body(Buffer::from(body))
303            .map_err(new_request_build_error)?;
304
305        self.send(req).await
306    }
307
308    pub async fn get_upload_part_url(&self, file_id: &str) -> Result<GetUploadPartUrlResponse> {
309        let auth_info = self.get_auth_info().await?;
310
311        let url = format!(
312            "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
313            auth_info.api_url, file_id
314        );
315
316        let mut req = Request::get(&url);
317
318        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
319
320        let req = req.extension(Operation::Write);
321
322        // Set body
323        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
324
325        let resp = self.send(req).await?;
326
327        let status = resp.status();
328        match status {
329            StatusCode::OK => {
330                let resp_body = resp.into_body();
331                let resp = serde_json::from_reader(resp_body.reader())
332                    .map_err(new_json_deserialize_error)?;
333                Ok(resp)
334            }
335            _ => Err(parse_error(resp)),
336        }
337    }
338
339    pub async fn upload_part(
340        &self,
341        file_id: &str,
342        part_number: usize,
343        size: u64,
344        body: Buffer,
345    ) -> Result<Response<Buffer>> {
346        let resp = self.get_upload_part_url(file_id).await?;
347
348        let mut req = Request::post(resp.upload_url);
349
350        req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
351
352        req = req.header(header::CONTENT_LENGTH, size.to_string());
353
354        req = req.header(header::AUTHORIZATION, resp.authorization_token);
355
356        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
357
358        let req = req.extension(Operation::Write);
359
360        // Set body
361        let req = req.body(body).map_err(new_request_build_error)?;
362
363        self.send(req).await
364    }
365
366    pub async fn finish_large_file(
367        &self,
368        file_id: &str,
369        part_sha1_array: Vec<String>,
370    ) -> Result<Response<Buffer>> {
371        let auth_info = self.get_auth_info().await?;
372
373        let url = format!("{}/b2api/v2/b2_finish_large_file", auth_info.api_url);
374
375        let mut req = Request::post(&url);
376
377        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
378
379        let req = req.extension(Operation::Write);
380
381        let body = serde_json::to_vec(&FinishLargeFileRequest {
382            file_id: file_id.to_owned(),
383            part_sha1_array,
384        })
385        .map_err(new_json_serialize_error)?;
386        let body = bytes::Bytes::from(body);
387
388        // Set body
389        let req = req
390            .body(Buffer::from(body))
391            .map_err(new_request_build_error)?;
392
393        self.send(req).await
394    }
395
396    pub async fn cancel_large_file(&self, file_id: &str) -> Result<Response<Buffer>> {
397        let auth_info = self.get_auth_info().await?;
398
399        let url = format!("{}/b2api/v2/b2_cancel_large_file", auth_info.api_url);
400
401        let mut req = Request::post(&url);
402
403        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
404
405        let req = req.extension(Operation::Write);
406
407        let body = serde_json::to_vec(&CancelLargeFileRequest {
408            file_id: file_id.to_owned(),
409        })
410        .map_err(new_json_serialize_error)?;
411        let body = bytes::Bytes::from(body);
412
413        // Set body
414        let req = req
415            .body(Buffer::from(body))
416            .map_err(new_request_build_error)?;
417
418        self.send(req).await
419    }
420
421    pub async fn get_file_info(&self, path: &str, delimiter: Option<&str>) -> Result<File> {
422        let resp = self
423            .list_file_names_raw(Some(path), delimiter, None, None, Operation::Stat)
424            .await?;
425
426        let status = resp.status();
427        match status {
428            StatusCode::OK => {
429                let bs = resp.into_body();
430                let mut resp: ListFileNamesResponse =
431                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
432
433                if resp.files.is_empty() {
434                    return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
435                }
436                Ok(resp.files.swap_remove(0))
437            }
438            _ => Err(parse_error(resp)),
439        }
440    }
441
442    pub async fn list_file_names(
443        &self,
444        prefix: Option<&str>,
445        delimiter: Option<&str>,
446        limit: Option<usize>,
447        start_after: Option<String>,
448    ) -> Result<Response<Buffer>> {
449        self.list_file_names_raw(prefix, delimiter, limit, start_after, Operation::List)
450            .await
451    }
452
453    async fn list_file_names_raw(
454        &self,
455        prefix: Option<&str>,
456        delimiter: Option<&str>,
457        limit: Option<usize>,
458        start_after: Option<String>,
459        operation: Operation,
460    ) -> Result<Response<Buffer>> {
461        let auth_info = self.get_auth_info().await?;
462
463        let url = format!("{}/b2api/v2/b2_list_file_names", auth_info.api_url);
464
465        let mut url = QueryPairsWriter::new(&url);
466        url = url.push("bucketId", &self.bucket_id);
467
468        if let Some(prefix) = prefix {
469            let prefix = build_abs_path(&self.root, prefix);
470            if !prefix.is_empty() {
471                url = url.push("prefix", &percent_encode_path(&prefix));
472            }
473        }
474
475        if let Some(limit) = limit {
476            url = url.push("maxFileCount", &limit.to_string());
477        }
478
479        if let Some(start_after) = start_after {
480            url = url.push("startFileName", &percent_encode_path(&start_after));
481        }
482
483        if let Some(delimiter) = delimiter {
484            url = url.push("delimiter", delimiter);
485        }
486
487        let mut req = Request::get(url.finish());
488
489        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
490
491        req = req.extension(operation);
492
493        // Set body
494        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
495
496        self.send(req).await
497    }
498
499    pub async fn copy_file(&self, source_file_id: String, to: &str) -> Result<Response<Buffer>> {
500        let to = build_abs_path(&self.root, to);
501
502        let auth_info = self.get_auth_info().await?;
503
504        let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
505
506        let mut req = Request::post(url);
507
508        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
509
510        let req = req.extension(Operation::Copy);
511
512        let body = CopyFileRequest {
513            source_file_id,
514            file_name: to,
515        };
516
517        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
518        let body = bytes::Bytes::from(body);
519
520        // Set body
521        let req = req
522            .body(Buffer::from(body))
523            .map_err(new_request_build_error)?;
524
525        self.send(req).await
526    }
527
528    pub async fn hide_file(&self, path: &str) -> Result<Response<Buffer>> {
529        let path = build_abs_path(&self.root, path);
530
531        let auth_info = self.get_auth_info().await?;
532
533        let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
534
535        let mut req = Request::post(url);
536
537        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
538
539        let req = req.extension(Operation::Delete);
540
541        let body = HideFileRequest {
542            bucket_id: self.bucket_id.clone(),
543            file_name: path.to_string(),
544        };
545
546        let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
547        let body = bytes::Bytes::from(body);
548
549        // Set body
550        let req = req
551            .body(Buffer::from(body))
552            .map_err(new_request_build_error)?;
553
554        self.send(req).await
555    }
556}
557
558#[derive(Clone)]
559pub struct B2Signer {
560    /// The application_key_id of this core.
561    pub application_key_id: String,
562    /// The application_key of this core.
563    pub application_key: String,
564
565    pub auth_info: AuthInfo,
566}
567
568#[derive(Clone)]
569pub struct AuthInfo {
570    pub authorization_token: String,
571    /// The base URL to use for all API calls except for uploading and downloading files.
572    pub api_url: String,
573    /// The base URL to use for downloading files.
574    pub download_url: String,
575    /// The time when the authorization token expires.
576    pub expires_in: Timestamp,
577}
578
579impl Default for B2Signer {
580    fn default() -> Self {
581        B2Signer {
582            application_key: String::new(),
583            application_key_id: String::new(),
584
585            auth_info: AuthInfo {
586                authorization_token: String::new(),
587                api_url: String::new(),
588                download_url: String::new(),
589                expires_in: Timestamp::MIN,
590            },
591        }
592    }
593}
594
595/// Request of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
596#[derive(Debug, Serialize)]
597#[serde(rename_all = "camelCase")]
598pub struct StartLargeFileRequest {
599    pub bucket_id: String,
600    pub file_name: String,
601    pub content_type: String,
602}
603
604/// Response of [b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
605#[derive(Debug, Deserialize)]
606#[serde(rename_all = "camelCase")]
607pub struct StartLargeFileResponse {
608    pub file_id: String,
609}
610
611/// Response of [b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account).
612#[derive(Debug, Deserialize)]
613#[serde(rename_all = "camelCase")]
614pub struct AuthorizeAccountResponse {
615    /// An authorization token to use with all calls, other than b2_authorize_account, that need an Authorization header. This authorization token is valid for at most 24 hours.
616    /// So we should call b2_authorize_account every 24 hours.
617    pub authorization_token: String,
618    pub api_url: String,
619    pub download_url: String,
620}
621
622/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-url).
623#[derive(Debug, Deserialize)]
624#[serde(rename_all = "camelCase")]
625pub struct GetUploadUrlResponse {
626    /// The authorizationToken that must be used when uploading files to this bucket.
627    /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file
628    pub authorization_token: String,
629    pub upload_url: String,
630}
631
632/// Response of [b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-part-url).
633#[derive(Debug, Deserialize)]
634#[serde(rename_all = "camelCase")]
635pub struct GetUploadPartUrlResponse {
636    /// The authorizationToken that must be used when uploading files to this bucket.
637    /// This token is valid for 24 hours or until the uploadUrl endpoint rejects an upload, see b2_upload_file
638    pub authorization_token: String,
639    pub upload_url: String,
640}
641
642/// Response of [b2_upload_part](https://www.backblaze.com/apidocs/b2-upload-part).
643#[derive(Debug, Deserialize)]
644#[serde(rename_all = "camelCase")]
645pub struct UploadPartResponse {
646    pub content_sha1: String,
647}
648
649/// Response of [b2_finish_large_file](https://www.backblaze.com/apidocs/b2-finish-large-file).
650#[derive(Debug, Serialize)]
651#[serde(rename_all = "camelCase")]
652pub struct FinishLargeFileRequest {
653    pub file_id: String,
654    pub part_sha1_array: Vec<String>,
655}
656
657/// Response of [b2_cancel_large_file](https://www.backblaze.com/apidocs/b2-cancel-large-file).
658#[derive(Debug, Serialize)]
659#[serde(rename_all = "camelCase")]
660pub struct CancelLargeFileRequest {
661    pub file_id: String,
662}
663
664/// Response of [list_file_names](https://www.backblaze.com/apidocs/b2-list-file-names).
665#[derive(Debug, Clone, Deserialize)]
666#[serde(rename_all = "camelCase")]
667pub struct ListFileNamesResponse {
668    pub files: Vec<File>,
669    pub next_file_name: Option<String>,
670}
671
672/// Response of [b2-finish-large-file](https://www.backblaze.com/apidocs/b2-finish-large-file).
673#[derive(Debug, Clone, Deserialize)]
674#[serde(rename_all = "camelCase")]
675pub struct UploadResponse {
676    pub content_length: u64,
677    pub content_md5: Option<String>,
678    pub content_type: Option<String>,
679}
680
681#[derive(Debug, Clone, Deserialize)]
682#[serde(rename_all = "camelCase")]
683pub struct File {
684    pub file_id: Option<String>,
685    pub content_length: u64,
686    pub content_md5: Option<String>,
687    pub content_type: Option<String>,
688    pub file_name: String,
689}
690
691pub(super) fn parse_file_info(file: &File) -> Metadata {
692    if file.file_name.ends_with('/') {
693        return Metadata::new(EntryMode::DIR);
694    }
695
696    let mut metadata = Metadata::new(EntryMode::FILE);
697
698    metadata.set_content_length(file.content_length);
699
700    if let Some(content_md5) = &file.content_md5 {
701        metadata.set_content_md5(content_md5);
702    }
703
704    if let Some(content_type) = &file.content_type {
705        metadata.set_content_type(content_type);
706    }
707
708    metadata
709}
710
711#[derive(Debug, Serialize)]
712#[serde(rename_all = "camelCase")]
713pub struct CopyFileRequest {
714    pub source_file_id: String,
715    pub file_name: String,
716}
717
718#[derive(Debug, Serialize)]
719#[serde(rename_all = "camelCase")]
720pub struct HideFileRequest {
721    pub bucket_id: String,
722    pub file_name: String,
723}
724
725#[derive(Debug, Serialize)]
726#[serde(rename_all = "camelCase")]
727pub struct GetDownloadAuthorizationRequest {
728    pub bucket_id: String,
729    pub file_name_prefix: String,
730    pub valid_duration_in_seconds: u64,
731}
732
733#[derive(Debug, Deserialize)]
734#[serde(rename_all = "camelCase")]
735pub struct GetDownloadAuthorizationResponse {
736    pub authorization_token: String,
737}