1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use http::header;
26use mea::rwlock::RwLock;
27use serde::Deserialize;
28use serde::Serialize;
29
30use self::constants::X_BZ_CONTENT_SHA1;
31use self::constants::X_BZ_FILE_NAME;
32use super::core::constants::X_BZ_PART_NUMBER;
33use super::error::parse_error;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38 pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
39 pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
40 pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
41}
42
43#[derive(Clone)]
45pub struct B2Core {
46 pub info: Arc<AccessorInfo>,
47 pub signer: Arc<RwLock<B2Signer>>,
48
49 pub root: String,
51 pub bucket: String,
53 pub bucket_id: String,
55}
56
57impl Debug for B2Core {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("B2Core")
60 .field("root", &self.root)
61 .field("bucket", &self.bucket)
62 .field("bucket_id", &self.bucket_id)
63 .finish_non_exhaustive()
64 }
65}
66
67impl B2Core {
68 #[inline]
69 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
70 self.info.http_client().send(req).await
71 }
72
73 pub async fn get_auth_info(&self) -> Result<AuthInfo> {
75 {
76 let signer = self.signer.read().await;
77
78 if !signer.auth_info.authorization_token.is_empty()
79 && signer.auth_info.expires_in > Timestamp::now()
80 {
81 let auth_info = signer.auth_info.clone();
82 return Ok(auth_info);
83 }
84 }
85
86 {
87 let mut signer = self.signer.write().await;
88 let req = Request::get("https://api.backblazeb2.com/b2api/v2/b2_authorize_account")
89 .header(
90 header::AUTHORIZATION,
91 format_authorization_by_basic(
92 &signer.application_key_id,
93 &signer.application_key,
94 )?,
95 )
96 .body(Buffer::new())
97 .map_err(new_request_build_error)?;
98
99 let resp = self.info.http_client().send(req).await?;
100 let status = resp.status();
101
102 match status {
103 StatusCode::OK => {
104 let resp_body = resp.into_body();
105 let token: AuthorizeAccountResponse =
106 serde_json::from_reader(resp_body.reader())
107 .map_err(new_json_deserialize_error)?;
108 signer.auth_info = AuthInfo {
109 authorization_token: token.authorization_token.clone(),
110 api_url: token.api_url.clone(),
111 download_url: token.download_url.clone(),
112 expires_in: Timestamp::now() + Duration::from_secs(20 * 60 * 60),
114 };
115 }
116 _ => {
117 return Err(parse_error(resp));
118 }
119 }
120 Ok(signer.auth_info.clone())
121 }
122 }
123}
124
125impl B2Core {
126 pub async fn download_file_by_name(
127 &self,
128 path: &str,
129 range: BytesRange,
130 _args: &OpRead,
131 ) -> Result<Response<HttpBody>> {
132 let path = build_abs_path(&self.root, path);
133
134 let auth_info = self.get_auth_info().await?;
135
136 let url = format!(
138 "{}/file/{}/{}",
139 auth_info.download_url,
140 self.bucket,
141 percent_encode_path(&path)
142 );
143
144 let mut req = Request::get(&url);
145
146 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
147
148 if !range.is_full() {
149 req = req.header(header::RANGE, range.to_header());
150 }
151
152 let req = req.extension(Operation::Read);
153
154 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
155
156 self.info.http_client().fetch(req).await
157 }
158
159 pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
160 let auth_info = self.get_auth_info().await?;
161
162 let url = format!(
163 "{}/b2api/v2/b2_get_upload_url?bucketId={}",
164 auth_info.api_url, self.bucket_id
165 );
166
167 let mut req = Request::get(&url);
168
169 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
170
171 let req = req.extension(Operation::Write);
172
173 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
175
176 let resp = self.send(req).await?;
177 let status = resp.status();
178 match status {
179 StatusCode::OK => {
180 let resp_body = resp.into_body();
181 let resp = serde_json::from_reader(resp_body.reader())
182 .map_err(new_json_deserialize_error)?;
183 Ok(resp)
184 }
185 _ => Err(parse_error(resp)),
186 }
187 }
188
189 pub async fn get_download_authorization(
190 &self,
191 path: &str,
192 expire: Duration,
193 ) -> Result<GetDownloadAuthorizationResponse> {
194 let path = build_abs_path(&self.root, path);
195
196 let auth_info = self.get_auth_info().await?;
197
198 let url = format!(
200 "{}/b2api/v2/b2_get_download_authorization",
201 auth_info.api_url
202 );
203 let mut req = Request::post(&url);
204
205 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
206
207 let body = GetDownloadAuthorizationRequest {
208 bucket_id: self.bucket_id.clone(),
209 file_name_prefix: path,
210 valid_duration_in_seconds: expire.as_secs(),
211 };
212 let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
213 let body = bytes::Bytes::from(body);
214
215 let req = req
216 .body(Buffer::from(body))
217 .map_err(new_request_build_error)?;
218
219 let resp = self.send(req).await?;
220
221 let status = resp.status();
222 match status {
223 StatusCode::OK => {
224 let resp_body = resp.into_body();
225 let resp = serde_json::from_reader(resp_body.reader())
226 .map_err(new_json_deserialize_error)?;
227 Ok(resp)
228 }
229 _ => Err(parse_error(resp)),
230 }
231 }
232
233 pub async fn upload_file(
234 &self,
235 path: &str,
236 size: Option<u64>,
237 args: &OpWrite,
238 body: Buffer,
239 ) -> Result<Response<Buffer>> {
240 let resp = self.get_upload_url().await?;
241
242 let p = build_abs_path(&self.root, path);
243
244 let mut req = Request::post(resp.upload_url);
245
246 req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
247
248 req = req.header(header::AUTHORIZATION, resp.authorization_token);
249
250 req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
251
252 if let Some(size) = size {
253 req = req.header(header::CONTENT_LENGTH, size.to_string())
254 }
255
256 if let Some(mime) = args.content_type() {
257 req = req.header(header::CONTENT_TYPE, mime)
258 } else {
259 req = req.header(header::CONTENT_TYPE, "b2/x-auto")
260 }
261
262 if let Some(pos) = args.content_disposition() {
263 req = req.header(header::CONTENT_DISPOSITION, pos)
264 }
265
266 let req = req.extension(Operation::Write);
267
268 let req = req.body(body).map_err(new_request_build_error)?;
270
271 self.send(req).await
272 }
273
274 pub async fn start_large_file(&self, path: &str, args: &OpWrite) -> Result<Response<Buffer>> {
275 let p = build_abs_path(&self.root, path);
276
277 let auth_info = self.get_auth_info().await?;
278
279 let url = format!("{}/b2api/v2/b2_start_large_file", auth_info.api_url);
280
281 let mut req = Request::post(&url);
282
283 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
284
285 let mut start_large_file_request = StartLargeFileRequest {
286 bucket_id: self.bucket_id.clone(),
287 file_name: percent_encode_path(&p),
288 content_type: "b2/x-auto".to_owned(),
289 };
290
291 if let Some(mime) = args.content_type() {
292 mime.clone_into(&mut start_large_file_request.content_type)
293 }
294
295 let req = req.extension(Operation::Write);
296
297 let body =
298 serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
299 let body = bytes::Bytes::from(body);
300
301 let req = req
302 .body(Buffer::from(body))
303 .map_err(new_request_build_error)?;
304
305 self.send(req).await
306 }
307
308 pub async fn get_upload_part_url(&self, file_id: &str) -> Result<GetUploadPartUrlResponse> {
309 let auth_info = self.get_auth_info().await?;
310
311 let url = format!(
312 "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
313 auth_info.api_url, file_id
314 );
315
316 let mut req = Request::get(&url);
317
318 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
319
320 let req = req.extension(Operation::Write);
321
322 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
324
325 let resp = self.send(req).await?;
326
327 let status = resp.status();
328 match status {
329 StatusCode::OK => {
330 let resp_body = resp.into_body();
331 let resp = serde_json::from_reader(resp_body.reader())
332 .map_err(new_json_deserialize_error)?;
333 Ok(resp)
334 }
335 _ => Err(parse_error(resp)),
336 }
337 }
338
339 pub async fn upload_part(
340 &self,
341 file_id: &str,
342 part_number: usize,
343 size: u64,
344 body: Buffer,
345 ) -> Result<Response<Buffer>> {
346 let resp = self.get_upload_part_url(file_id).await?;
347
348 let mut req = Request::post(resp.upload_url);
349
350 req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
351
352 req = req.header(header::CONTENT_LENGTH, size.to_string());
353
354 req = req.header(header::AUTHORIZATION, resp.authorization_token);
355
356 req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
357
358 let req = req.extension(Operation::Write);
359
360 let req = req.body(body).map_err(new_request_build_error)?;
362
363 self.send(req).await
364 }
365
366 pub async fn finish_large_file(
367 &self,
368 file_id: &str,
369 part_sha1_array: Vec<String>,
370 ) -> Result<Response<Buffer>> {
371 let auth_info = self.get_auth_info().await?;
372
373 let url = format!("{}/b2api/v2/b2_finish_large_file", auth_info.api_url);
374
375 let mut req = Request::post(&url);
376
377 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
378
379 let req = req.extension(Operation::Write);
380
381 let body = serde_json::to_vec(&FinishLargeFileRequest {
382 file_id: file_id.to_owned(),
383 part_sha1_array,
384 })
385 .map_err(new_json_serialize_error)?;
386 let body = bytes::Bytes::from(body);
387
388 let req = req
390 .body(Buffer::from(body))
391 .map_err(new_request_build_error)?;
392
393 self.send(req).await
394 }
395
396 pub async fn cancel_large_file(&self, file_id: &str) -> Result<Response<Buffer>> {
397 let auth_info = self.get_auth_info().await?;
398
399 let url = format!("{}/b2api/v2/b2_cancel_large_file", auth_info.api_url);
400
401 let mut req = Request::post(&url);
402
403 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
404
405 let req = req.extension(Operation::Write);
406
407 let body = serde_json::to_vec(&CancelLargeFileRequest {
408 file_id: file_id.to_owned(),
409 })
410 .map_err(new_json_serialize_error)?;
411 let body = bytes::Bytes::from(body);
412
413 let req = req
415 .body(Buffer::from(body))
416 .map_err(new_request_build_error)?;
417
418 self.send(req).await
419 }
420
421 pub async fn get_file_info(&self, path: &str, delimiter: Option<&str>) -> Result<File> {
422 let resp = self
423 .list_file_names_raw(Some(path), delimiter, None, None, Operation::Stat)
424 .await?;
425
426 let status = resp.status();
427 match status {
428 StatusCode::OK => {
429 let bs = resp.into_body();
430 let mut resp: ListFileNamesResponse =
431 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
432
433 if resp.files.is_empty() {
434 return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
435 }
436 Ok(resp.files.swap_remove(0))
437 }
438 _ => Err(parse_error(resp)),
439 }
440 }
441
442 pub async fn list_file_names(
443 &self,
444 prefix: Option<&str>,
445 delimiter: Option<&str>,
446 limit: Option<usize>,
447 start_after: Option<String>,
448 ) -> Result<Response<Buffer>> {
449 self.list_file_names_raw(prefix, delimiter, limit, start_after, Operation::List)
450 .await
451 }
452
453 async fn list_file_names_raw(
454 &self,
455 prefix: Option<&str>,
456 delimiter: Option<&str>,
457 limit: Option<usize>,
458 start_after: Option<String>,
459 operation: Operation,
460 ) -> Result<Response<Buffer>> {
461 let auth_info = self.get_auth_info().await?;
462
463 let url = format!("{}/b2api/v2/b2_list_file_names", auth_info.api_url);
464
465 let mut url = QueryPairsWriter::new(&url);
466 url = url.push("bucketId", &self.bucket_id);
467
468 if let Some(prefix) = prefix {
469 let prefix = build_abs_path(&self.root, prefix);
470 if !prefix.is_empty() {
471 url = url.push("prefix", &percent_encode_path(&prefix));
472 }
473 }
474
475 if let Some(limit) = limit {
476 url = url.push("maxFileCount", &limit.to_string());
477 }
478
479 if let Some(start_after) = start_after {
480 url = url.push("startFileName", &percent_encode_path(&start_after));
481 }
482
483 if let Some(delimiter) = delimiter {
484 url = url.push("delimiter", delimiter);
485 }
486
487 let mut req = Request::get(url.finish());
488
489 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
490
491 req = req.extension(operation);
492
493 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
495
496 self.send(req).await
497 }
498
499 pub async fn copy_file(&self, source_file_id: String, to: &str) -> Result<Response<Buffer>> {
500 let to = build_abs_path(&self.root, to);
501
502 let auth_info = self.get_auth_info().await?;
503
504 let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
505
506 let mut req = Request::post(url);
507
508 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
509
510 let req = req.extension(Operation::Copy);
511
512 let body = CopyFileRequest {
513 source_file_id,
514 file_name: to,
515 };
516
517 let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
518 let body = bytes::Bytes::from(body);
519
520 let req = req
522 .body(Buffer::from(body))
523 .map_err(new_request_build_error)?;
524
525 self.send(req).await
526 }
527
528 pub async fn hide_file(&self, path: &str) -> Result<Response<Buffer>> {
529 let path = build_abs_path(&self.root, path);
530
531 let auth_info = self.get_auth_info().await?;
532
533 let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
534
535 let mut req = Request::post(url);
536
537 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
538
539 let req = req.extension(Operation::Delete);
540
541 let body = HideFileRequest {
542 bucket_id: self.bucket_id.clone(),
543 file_name: path.to_string(),
544 };
545
546 let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
547 let body = bytes::Bytes::from(body);
548
549 let req = req
551 .body(Buffer::from(body))
552 .map_err(new_request_build_error)?;
553
554 self.send(req).await
555 }
556}
557
558#[derive(Clone)]
559pub struct B2Signer {
560 pub application_key_id: String,
562 pub application_key: String,
564
565 pub auth_info: AuthInfo,
566}
567
568#[derive(Clone)]
569pub struct AuthInfo {
570 pub authorization_token: String,
571 pub api_url: String,
573 pub download_url: String,
575 pub expires_in: Timestamp,
577}
578
579impl Default for B2Signer {
580 fn default() -> Self {
581 B2Signer {
582 application_key: String::new(),
583 application_key_id: String::new(),
584
585 auth_info: AuthInfo {
586 authorization_token: String::new(),
587 api_url: String::new(),
588 download_url: String::new(),
589 expires_in: Timestamp::MIN,
590 },
591 }
592 }
593}
594
595#[derive(Debug, Serialize)]
597#[serde(rename_all = "camelCase")]
598pub struct StartLargeFileRequest {
599 pub bucket_id: String,
600 pub file_name: String,
601 pub content_type: String,
602}
603
604#[derive(Debug, Deserialize)]
606#[serde(rename_all = "camelCase")]
607pub struct StartLargeFileResponse {
608 pub file_id: String,
609}
610
611#[derive(Debug, Deserialize)]
613#[serde(rename_all = "camelCase")]
614pub struct AuthorizeAccountResponse {
615 pub authorization_token: String,
618 pub api_url: String,
619 pub download_url: String,
620}
621
622#[derive(Debug, Deserialize)]
624#[serde(rename_all = "camelCase")]
625pub struct GetUploadUrlResponse {
626 pub authorization_token: String,
629 pub upload_url: String,
630}
631
632#[derive(Debug, Deserialize)]
634#[serde(rename_all = "camelCase")]
635pub struct GetUploadPartUrlResponse {
636 pub authorization_token: String,
639 pub upload_url: String,
640}
641
642#[derive(Debug, Deserialize)]
644#[serde(rename_all = "camelCase")]
645pub struct UploadPartResponse {
646 pub content_sha1: String,
647}
648
649#[derive(Debug, Serialize)]
651#[serde(rename_all = "camelCase")]
652pub struct FinishLargeFileRequest {
653 pub file_id: String,
654 pub part_sha1_array: Vec<String>,
655}
656
657#[derive(Debug, Serialize)]
659#[serde(rename_all = "camelCase")]
660pub struct CancelLargeFileRequest {
661 pub file_id: String,
662}
663
664#[derive(Debug, Clone, Deserialize)]
666#[serde(rename_all = "camelCase")]
667pub struct ListFileNamesResponse {
668 pub files: Vec<File>,
669 pub next_file_name: Option<String>,
670}
671
672#[derive(Debug, Clone, Deserialize)]
674#[serde(rename_all = "camelCase")]
675pub struct UploadResponse {
676 pub content_length: u64,
677 pub content_md5: Option<String>,
678 pub content_type: Option<String>,
679}
680
681#[derive(Debug, Clone, Deserialize)]
682#[serde(rename_all = "camelCase")]
683pub struct File {
684 pub file_id: Option<String>,
685 pub content_length: u64,
686 pub content_md5: Option<String>,
687 pub content_type: Option<String>,
688 pub file_name: String,
689}
690
691pub(super) fn parse_file_info(file: &File) -> Metadata {
692 if file.file_name.ends_with('/') {
693 return Metadata::new(EntryMode::DIR);
694 }
695
696 let mut metadata = Metadata::new(EntryMode::FILE);
697
698 metadata.set_content_length(file.content_length);
699
700 if let Some(content_md5) = &file.content_md5 {
701 metadata.set_content_md5(content_md5);
702 }
703
704 if let Some(content_type) = &file.content_type {
705 metadata.set_content_type(content_type);
706 }
707
708 metadata
709}
710
711#[derive(Debug, Serialize)]
712#[serde(rename_all = "camelCase")]
713pub struct CopyFileRequest {
714 pub source_file_id: String,
715 pub file_name: String,
716}
717
718#[derive(Debug, Serialize)]
719#[serde(rename_all = "camelCase")]
720pub struct HideFileRequest {
721 pub bucket_id: String,
722 pub file_name: String,
723}
724
725#[derive(Debug, Serialize)]
726#[serde(rename_all = "camelCase")]
727pub struct GetDownloadAuthorizationRequest {
728 pub bucket_id: String,
729 pub file_name_prefix: String,
730 pub valid_duration_in_seconds: u64,
731}
732
733#[derive(Debug, Deserialize)]
734#[serde(rename_all = "camelCase")]
735pub struct GetDownloadAuthorizationResponse {
736 pub authorization_token: String,
737}