1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use chrono::DateTime;
25use chrono::Utc;
26use http::header;
27use http::Request;
28use http::Response;
29use http::StatusCode;
30use serde::Deserialize;
31use serde::Serialize;
32use tokio::sync::RwLock;
33
34use self::constants::X_BZ_CONTENT_SHA1;
35use self::constants::X_BZ_FILE_NAME;
36use super::core::constants::X_BZ_PART_NUMBER;
37use super::error::parse_error;
38use crate::raw::*;
39use crate::*;
40
41pub(super) mod constants {
42 pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
43 pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
44 pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
45}
46
47#[derive(Clone)]
49pub struct B2Core {
50 pub info: Arc<AccessorInfo>,
51 pub signer: Arc<RwLock<B2Signer>>,
52
53 pub root: String,
55 pub bucket: String,
57 pub bucket_id: String,
59}
60
61impl Debug for B2Core {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("Backend")
64 .field("root", &self.root)
65 .field("bucket", &self.bucket)
66 .field("bucket_id", &self.bucket_id)
67 .finish_non_exhaustive()
68 }
69}
70
71impl B2Core {
72 #[inline]
73 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
74 self.info.http_client().send(req).await
75 }
76
77 pub async fn get_auth_info(&self) -> Result<AuthInfo> {
79 {
80 let signer = self.signer.read().await;
81
82 if !signer.auth_info.authorization_token.is_empty()
83 && signer.auth_info.expires_in > Utc::now()
84 {
85 let auth_info = signer.auth_info.clone();
86 return Ok(auth_info);
87 }
88 }
89
90 {
91 let mut signer = self.signer.write().await;
92 let req = Request::get("https://api.backblazeb2.com/b2api/v2/b2_authorize_account")
93 .header(
94 header::AUTHORIZATION,
95 format_authorization_by_basic(
96 &signer.application_key_id,
97 &signer.application_key,
98 )?,
99 )
100 .body(Buffer::new())
101 .map_err(new_request_build_error)?;
102
103 let resp = self.info.http_client().send(req).await?;
104 let status = resp.status();
105
106 match status {
107 StatusCode::OK => {
108 let resp_body = resp.into_body();
109 let token: AuthorizeAccountResponse =
110 serde_json::from_reader(resp_body.reader())
111 .map_err(new_json_deserialize_error)?;
112 signer.auth_info = AuthInfo {
113 authorization_token: token.authorization_token.clone(),
114 api_url: token.api_url.clone(),
115 download_url: token.download_url.clone(),
116 expires_in: Utc::now()
118 + chrono::TimeDelta::try_hours(20).expect("20 hours must be valid"),
119 };
120 }
121 _ => {
122 return Err(parse_error(resp));
123 }
124 }
125 Ok(signer.auth_info.clone())
126 }
127 }
128}
129
130impl B2Core {
131 pub async fn download_file_by_name(
132 &self,
133 path: &str,
134 range: BytesRange,
135 _args: &OpRead,
136 ) -> Result<Response<HttpBody>> {
137 let path = build_abs_path(&self.root, path);
138
139 let auth_info = self.get_auth_info().await?;
140
141 let url = format!(
143 "{}/file/{}/{}",
144 auth_info.download_url,
145 self.bucket,
146 percent_encode_path(&path)
147 );
148
149 let mut req = Request::get(&url);
150
151 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
152
153 if !range.is_full() {
154 req = req.header(header::RANGE, range.to_header());
155 }
156
157 let req = req.extension(Operation::Read);
158
159 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
160
161 self.info.http_client().fetch(req).await
162 }
163
164 pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
165 let auth_info = self.get_auth_info().await?;
166
167 let url = format!(
168 "{}/b2api/v2/b2_get_upload_url?bucketId={}",
169 auth_info.api_url, self.bucket_id
170 );
171
172 let mut req = Request::get(&url);
173
174 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
175
176 let req = req.extension(Operation::Write);
177
178 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
180
181 let resp = self.send(req).await?;
182 let status = resp.status();
183 match status {
184 StatusCode::OK => {
185 let resp_body = resp.into_body();
186 let resp = serde_json::from_reader(resp_body.reader())
187 .map_err(new_json_deserialize_error)?;
188 Ok(resp)
189 }
190 _ => Err(parse_error(resp)),
191 }
192 }
193
194 pub async fn get_download_authorization(
195 &self,
196 path: &str,
197 expire: Duration,
198 ) -> Result<GetDownloadAuthorizationResponse> {
199 let path = build_abs_path(&self.root, path);
200
201 let auth_info = self.get_auth_info().await?;
202
203 let url = format!(
205 "{}/b2api/v2/b2_get_download_authorization",
206 auth_info.api_url
207 );
208 let mut req = Request::post(&url);
209
210 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
211
212 let body = GetDownloadAuthorizationRequest {
213 bucket_id: self.bucket_id.clone(),
214 file_name_prefix: path,
215 valid_duration_in_seconds: expire.as_secs(),
216 };
217 let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
218 let body = bytes::Bytes::from(body);
219
220 let req = req
221 .body(Buffer::from(body))
222 .map_err(new_request_build_error)?;
223
224 let resp = self.send(req).await?;
225
226 let status = resp.status();
227 match status {
228 StatusCode::OK => {
229 let resp_body = resp.into_body();
230 let resp = serde_json::from_reader(resp_body.reader())
231 .map_err(new_json_deserialize_error)?;
232 Ok(resp)
233 }
234 _ => Err(parse_error(resp)),
235 }
236 }
237
238 pub async fn upload_file(
239 &self,
240 path: &str,
241 size: Option<u64>,
242 args: &OpWrite,
243 body: Buffer,
244 ) -> Result<Response<Buffer>> {
245 let resp = self.get_upload_url().await?;
246
247 let p = build_abs_path(&self.root, path);
248
249 let mut req = Request::post(resp.upload_url);
250
251 req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
252
253 req = req.header(header::AUTHORIZATION, resp.authorization_token);
254
255 req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
256
257 if let Some(size) = size {
258 req = req.header(header::CONTENT_LENGTH, size.to_string())
259 }
260
261 if let Some(mime) = args.content_type() {
262 req = req.header(header::CONTENT_TYPE, mime)
263 } else {
264 req = req.header(header::CONTENT_TYPE, "b2/x-auto")
265 }
266
267 if let Some(pos) = args.content_disposition() {
268 req = req.header(header::CONTENT_DISPOSITION, pos)
269 }
270
271 let req = req.extension(Operation::Write);
272
273 let req = req.body(body).map_err(new_request_build_error)?;
275
276 self.send(req).await
277 }
278
279 pub async fn start_large_file(&self, path: &str, args: &OpWrite) -> Result<Response<Buffer>> {
280 let p = build_abs_path(&self.root, path);
281
282 let auth_info = self.get_auth_info().await?;
283
284 let url = format!("{}/b2api/v2/b2_start_large_file", auth_info.api_url);
285
286 let mut req = Request::post(&url);
287
288 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
289
290 let mut start_large_file_request = StartLargeFileRequest {
291 bucket_id: self.bucket_id.clone(),
292 file_name: percent_encode_path(&p),
293 content_type: "b2/x-auto".to_owned(),
294 };
295
296 if let Some(mime) = args.content_type() {
297 mime.clone_into(&mut start_large_file_request.content_type)
298 }
299
300 let req = req.extension(Operation::Write);
301
302 let body =
303 serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
304 let body = bytes::Bytes::from(body);
305
306 let req = req
307 .body(Buffer::from(body))
308 .map_err(new_request_build_error)?;
309
310 self.send(req).await
311 }
312
313 pub async fn get_upload_part_url(&self, file_id: &str) -> Result<GetUploadPartUrlResponse> {
314 let auth_info = self.get_auth_info().await?;
315
316 let url = format!(
317 "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
318 auth_info.api_url, file_id
319 );
320
321 let mut req = Request::get(&url);
322
323 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
324
325 let req = req.extension(Operation::Write);
326
327 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
329
330 let resp = self.send(req).await?;
331
332 let status = resp.status();
333 match status {
334 StatusCode::OK => {
335 let resp_body = resp.into_body();
336 let resp = serde_json::from_reader(resp_body.reader())
337 .map_err(new_json_deserialize_error)?;
338 Ok(resp)
339 }
340 _ => Err(parse_error(resp)),
341 }
342 }
343
344 pub async fn upload_part(
345 &self,
346 file_id: &str,
347 part_number: usize,
348 size: u64,
349 body: Buffer,
350 ) -> Result<Response<Buffer>> {
351 let resp = self.get_upload_part_url(file_id).await?;
352
353 let mut req = Request::post(resp.upload_url);
354
355 req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
356
357 req = req.header(header::CONTENT_LENGTH, size.to_string());
358
359 req = req.header(header::AUTHORIZATION, resp.authorization_token);
360
361 req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
362
363 let req = req.extension(Operation::Write);
364
365 let req = req.body(body).map_err(new_request_build_error)?;
367
368 self.send(req).await
369 }
370
371 pub async fn finish_large_file(
372 &self,
373 file_id: &str,
374 part_sha1_array: Vec<String>,
375 ) -> Result<Response<Buffer>> {
376 let auth_info = self.get_auth_info().await?;
377
378 let url = format!("{}/b2api/v2/b2_finish_large_file", auth_info.api_url);
379
380 let mut req = Request::post(&url);
381
382 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
383
384 let req = req.extension(Operation::Write);
385
386 let body = serde_json::to_vec(&FinishLargeFileRequest {
387 file_id: file_id.to_owned(),
388 part_sha1_array,
389 })
390 .map_err(new_json_serialize_error)?;
391 let body = bytes::Bytes::from(body);
392
393 let req = req
395 .body(Buffer::from(body))
396 .map_err(new_request_build_error)?;
397
398 self.send(req).await
399 }
400
401 pub async fn cancel_large_file(&self, file_id: &str) -> Result<Response<Buffer>> {
402 let auth_info = self.get_auth_info().await?;
403
404 let url = format!("{}/b2api/v2/b2_cancel_large_file", auth_info.api_url);
405
406 let mut req = Request::post(&url);
407
408 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
409
410 let req = req.extension(Operation::Write);
411
412 let body = serde_json::to_vec(&CancelLargeFileRequest {
413 file_id: file_id.to_owned(),
414 })
415 .map_err(new_json_serialize_error)?;
416 let body = bytes::Bytes::from(body);
417
418 let req = req
420 .body(Buffer::from(body))
421 .map_err(new_request_build_error)?;
422
423 self.send(req).await
424 }
425
426 pub async fn get_file_info(&self, path: &str, delimiter: Option<&str>) -> Result<File> {
427 let resp = self
428 .list_file_names_raw(Some(path), delimiter, None, None, Operation::Stat)
429 .await?;
430
431 let status = resp.status();
432 match status {
433 StatusCode::OK => {
434 let bs = resp.into_body();
435 let mut resp: ListFileNamesResponse =
436 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
437
438 if resp.files.is_empty() {
439 return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
440 }
441 Ok(resp.files.swap_remove(0))
442 }
443 _ => Err(parse_error(resp)),
444 }
445 }
446
447 pub async fn list_file_names(
448 &self,
449 prefix: Option<&str>,
450 delimiter: Option<&str>,
451 limit: Option<usize>,
452 start_after: Option<String>,
453 ) -> Result<Response<Buffer>> {
454 self.list_file_names_raw(prefix, delimiter, limit, start_after, Operation::List)
455 .await
456 }
457
458 async fn list_file_names_raw(
459 &self,
460 prefix: Option<&str>,
461 delimiter: Option<&str>,
462 limit: Option<usize>,
463 start_after: Option<String>,
464 operation: Operation,
465 ) -> Result<Response<Buffer>> {
466 let auth_info = self.get_auth_info().await?;
467
468 let url = format!("{}/b2api/v2/b2_list_file_names", auth_info.api_url);
469
470 let mut url = QueryPairsWriter::new(&url);
471 url = url.push("bucketId", &self.bucket_id);
472
473 if let Some(prefix) = prefix {
474 let prefix = build_abs_path(&self.root, prefix);
475 if !prefix.is_empty() {
476 url = url.push("prefix", &percent_encode_path(&prefix));
477 }
478 }
479
480 if let Some(limit) = limit {
481 url = url.push("maxFileCount", &limit.to_string());
482 }
483
484 if let Some(start_after) = start_after {
485 url = url.push("startFileName", &percent_encode_path(&start_after));
486 }
487
488 if let Some(delimiter) = delimiter {
489 url = url.push("delimiter", delimiter);
490 }
491
492 let mut req = Request::get(url.finish());
493
494 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
495
496 req = req.extension(operation);
497
498 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
500
501 self.send(req).await
502 }
503
504 pub async fn copy_file(&self, source_file_id: String, to: &str) -> Result<Response<Buffer>> {
505 let to = build_abs_path(&self.root, to);
506
507 let auth_info = self.get_auth_info().await?;
508
509 let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
510
511 let mut req = Request::post(url);
512
513 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
514
515 let req = req.extension(Operation::Copy);
516
517 let body = CopyFileRequest {
518 source_file_id,
519 file_name: to,
520 };
521
522 let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
523 let body = bytes::Bytes::from(body);
524
525 let req = req
527 .body(Buffer::from(body))
528 .map_err(new_request_build_error)?;
529
530 self.send(req).await
531 }
532
533 pub async fn hide_file(&self, path: &str) -> Result<Response<Buffer>> {
534 let path = build_abs_path(&self.root, path);
535
536 let auth_info = self.get_auth_info().await?;
537
538 let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
539
540 let mut req = Request::post(url);
541
542 req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
543
544 let req = req.extension(Operation::Delete);
545
546 let body = HideFileRequest {
547 bucket_id: self.bucket_id.clone(),
548 file_name: path.to_string(),
549 };
550
551 let body = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
552 let body = bytes::Bytes::from(body);
553
554 let req = req
556 .body(Buffer::from(body))
557 .map_err(new_request_build_error)?;
558
559 self.send(req).await
560 }
561}
562
563#[derive(Clone)]
564pub struct B2Signer {
565 pub application_key_id: String,
567 pub application_key: String,
569
570 pub auth_info: AuthInfo,
571}
572
573#[derive(Clone)]
574pub struct AuthInfo {
575 pub authorization_token: String,
576 pub api_url: String,
578 pub download_url: String,
580
581 pub expires_in: DateTime<Utc>,
582}
583
584impl Default for B2Signer {
585 fn default() -> Self {
586 B2Signer {
587 application_key: String::new(),
588 application_key_id: String::new(),
589
590 auth_info: AuthInfo {
591 authorization_token: String::new(),
592 api_url: String::new(),
593 download_url: String::new(),
594 expires_in: DateTime::<Utc>::MIN_UTC,
595 },
596 }
597 }
598}
599
600#[derive(Debug, Serialize)]
602#[serde(rename_all = "camelCase")]
603pub struct StartLargeFileRequest {
604 pub bucket_id: String,
605 pub file_name: String,
606 pub content_type: String,
607}
608
609#[derive(Debug, Deserialize)]
611#[serde(rename_all = "camelCase")]
612pub struct StartLargeFileResponse {
613 pub file_id: String,
614}
615
616#[derive(Debug, Deserialize)]
618#[serde(rename_all = "camelCase")]
619pub struct AuthorizeAccountResponse {
620 pub authorization_token: String,
623 pub api_url: String,
624 pub download_url: String,
625}
626
627#[derive(Debug, Deserialize)]
629#[serde(rename_all = "camelCase")]
630pub struct GetUploadUrlResponse {
631 pub authorization_token: String,
634 pub upload_url: String,
635}
636
637#[derive(Debug, Deserialize)]
639#[serde(rename_all = "camelCase")]
640pub struct GetUploadPartUrlResponse {
641 pub authorization_token: String,
644 pub upload_url: String,
645}
646
647#[derive(Debug, Deserialize)]
649#[serde(rename_all = "camelCase")]
650pub struct UploadPartResponse {
651 pub content_sha1: String,
652}
653
654#[derive(Debug, Serialize)]
656#[serde(rename_all = "camelCase")]
657pub struct FinishLargeFileRequest {
658 pub file_id: String,
659 pub part_sha1_array: Vec<String>,
660}
661
662#[derive(Debug, Serialize)]
664#[serde(rename_all = "camelCase")]
665pub struct CancelLargeFileRequest {
666 pub file_id: String,
667}
668
669#[derive(Debug, Clone, Deserialize)]
671#[serde(rename_all = "camelCase")]
672pub struct ListFileNamesResponse {
673 pub files: Vec<File>,
674 pub next_file_name: Option<String>,
675}
676
677#[derive(Debug, Clone, Deserialize)]
679#[serde(rename_all = "camelCase")]
680pub struct UploadResponse {
681 pub content_length: u64,
682 pub content_md5: Option<String>,
683 pub content_type: Option<String>,
684}
685
686#[derive(Debug, Clone, Deserialize)]
687#[serde(rename_all = "camelCase")]
688pub struct File {
689 pub file_id: Option<String>,
690 pub content_length: u64,
691 pub content_md5: Option<String>,
692 pub content_type: Option<String>,
693 pub file_name: String,
694}
695
696pub(super) fn parse_file_info(file: &File) -> Metadata {
697 if file.file_name.ends_with('/') {
698 return Metadata::new(EntryMode::DIR);
699 }
700
701 let mut metadata = Metadata::new(EntryMode::FILE);
702
703 metadata.set_content_length(file.content_length);
704
705 if let Some(content_md5) = &file.content_md5 {
706 metadata.set_content_md5(content_md5);
707 }
708
709 if let Some(content_type) = &file.content_type {
710 metadata.set_content_type(content_type);
711 }
712
713 metadata
714}
715
716#[derive(Debug, Serialize)]
717#[serde(rename_all = "camelCase")]
718pub struct CopyFileRequest {
719 pub source_file_id: String,
720 pub file_name: String,
721}
722
723#[derive(Debug, Serialize)]
724#[serde(rename_all = "camelCase")]
725pub struct HideFileRequest {
726 pub bucket_id: String,
727 pub file_name: String,
728}
729
730#[derive(Debug, Serialize)]
731#[serde(rename_all = "camelCase")]
732pub struct GetDownloadAuthorizationRequest {
733 pub bucket_id: String,
734 pub file_name_prefix: String,
735 pub valid_duration_in_seconds: u64,
736}
737
738#[derive(Debug, Deserialize)]
739#[serde(rename_all = "camelCase")]
740pub struct GetDownloadAuthorizationResponse {
741 pub authorization_token: String,
742}