1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use log::debug;
27use tokio::sync::RwLock;
28
29use super::core::constants;
30use super::core::parse_file_info;
31use super::core::B2Core;
32use super::core::B2Signer;
33use super::core::ListFileNamesResponse;
34use super::delete::B2Deleter;
35use super::error::parse_error;
36use super::lister::B2Lister;
37use super::writer::B2Writer;
38use super::writer::B2Writers;
39use crate::raw::*;
40use crate::services::B2Config;
41use crate::*;
42
43impl Configurator for B2Config {
44 type Builder = B2Builder;
45
46 #[allow(deprecated)]
47 fn into_builder(self) -> Self::Builder {
48 B2Builder {
49 config: self,
50 http_client: None,
51 }
52 }
53}
54
55#[doc = include_str!("docs.md")]
57#[derive(Default)]
58pub struct B2Builder {
59 config: B2Config,
60
61 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
62 http_client: Option<HttpClient>,
63}
64
65impl Debug for B2Builder {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 let mut d = f.debug_struct("B2Builder");
68
69 d.field("config", &self.config);
70 d.finish_non_exhaustive()
71 }
72}
73
74impl B2Builder {
75 pub fn root(mut self, root: &str) -> Self {
79 self.config.root = if root.is_empty() {
80 None
81 } else {
82 Some(root.to_string())
83 };
84
85 self
86 }
87
88 pub fn application_key_id(mut self, application_key_id: &str) -> Self {
90 self.config.application_key_id = if application_key_id.is_empty() {
91 None
92 } else {
93 Some(application_key_id.to_string())
94 };
95
96 self
97 }
98
99 pub fn application_key(mut self, application_key: &str) -> Self {
101 self.config.application_key = if application_key.is_empty() {
102 None
103 } else {
104 Some(application_key.to_string())
105 };
106
107 self
108 }
109
110 pub fn bucket(mut self, bucket: &str) -> Self {
113 self.config.bucket = bucket.to_string();
114
115 self
116 }
117
118 pub fn bucket_id(mut self, bucket_id: &str) -> Self {
121 self.config.bucket_id = bucket_id.to_string();
122
123 self
124 }
125
126 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
133 #[allow(deprecated)]
134 pub fn http_client(mut self, client: HttpClient) -> Self {
135 self.http_client = Some(client);
136 self
137 }
138}
139
140impl Builder for B2Builder {
141 const SCHEME: Scheme = Scheme::B2;
142 type Config = B2Config;
143
144 fn build(self) -> Result<impl Access> {
146 debug!("backend build started: {:?}", &self);
147
148 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
149 debug!("backend use root {}", &root);
150
151 if self.config.bucket.is_empty() {
153 return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
154 .with_operation("Builder::build")
155 .with_context("service", Scheme::B2));
156 }
157
158 debug!("backend use bucket {}", &self.config.bucket);
159
160 if self.config.bucket_id.is_empty() {
162 return Err(Error::new(ErrorKind::ConfigInvalid, "bucket_id is empty")
163 .with_operation("Builder::build")
164 .with_context("service", Scheme::B2));
165 }
166
167 debug!("backend bucket_id {}", &self.config.bucket_id);
168
169 let application_key_id = match &self.config.application_key_id {
170 Some(application_key_id) => Ok(application_key_id.clone()),
171 None => Err(
172 Error::new(ErrorKind::ConfigInvalid, "application_key_id is empty")
173 .with_operation("Builder::build")
174 .with_context("service", Scheme::B2),
175 ),
176 }?;
177
178 let application_key = match &self.config.application_key {
179 Some(key_id) => Ok(key_id.clone()),
180 None => Err(
181 Error::new(ErrorKind::ConfigInvalid, "application_key is empty")
182 .with_operation("Builder::build")
183 .with_context("service", Scheme::B2),
184 ),
185 }?;
186
187 let signer = B2Signer {
188 application_key_id,
189 application_key,
190 ..Default::default()
191 };
192
193 Ok(B2Backend {
194 core: Arc::new(B2Core {
195 info: {
196 let am = AccessorInfo::default();
197 am.set_scheme(Scheme::B2)
198 .set_root(&root)
199 .set_native_capability(Capability {
200 stat: true,
201 stat_has_content_length: true,
202 stat_has_content_md5: true,
203 stat_has_content_type: true,
204
205 read: true,
206
207 write: true,
208 write_can_empty: true,
209 write_can_multi: true,
210 write_with_content_type: true,
211 write_multi_min_size: Some(5 * 1024 * 1024),
215 write_multi_max_size: if cfg!(target_pointer_width = "64") {
219 Some(5 * 1024 * 1024 * 1024)
220 } else {
221 Some(usize::MAX)
222 },
223
224 delete: true,
225 copy: true,
226
227 list: true,
228 list_with_limit: true,
229 list_with_start_after: true,
230 list_with_recursive: true,
231 list_has_content_length: true,
232 list_has_content_md5: true,
233 list_has_content_type: true,
234
235 presign: true,
236 presign_read: true,
237 presign_write: true,
238 presign_stat: true,
239
240 shared: true,
241
242 ..Default::default()
243 });
244
245 #[allow(deprecated)]
247 if let Some(client) = self.http_client {
248 am.update_http_client(|_| client);
249 }
250
251 am.into()
252 },
253 signer: Arc::new(RwLock::new(signer)),
254 root,
255
256 bucket: self.config.bucket.clone(),
257 bucket_id: self.config.bucket_id.clone(),
258 }),
259 })
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct B2Backend {
266 core: Arc<B2Core>,
267}
268
269impl Access for B2Backend {
270 type Reader = HttpBody;
271 type Writer = B2Writers;
272 type Lister = oio::PageLister<B2Lister>;
273 type Deleter = oio::OneShotDeleter<B2Deleter>;
274 type BlockingReader = ();
275 type BlockingWriter = ();
276 type BlockingLister = ();
277 type BlockingDeleter = ();
278
279 fn info(&self) -> Arc<AccessorInfo> {
280 self.core.info.clone()
281 }
282
283 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
286 if path == "/" {
288 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
289 }
290
291 let delimiter = if path.ends_with('/') { Some("/") } else { None };
292 let resp = self
293 .core
294 .list_file_names(Some(path), delimiter, None, None)
295 .await?;
296
297 let status = resp.status();
298
299 match status {
300 StatusCode::OK => {
301 let bs = resp.into_body();
302
303 let resp: ListFileNamesResponse =
304 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
305 if resp.files.is_empty() {
306 return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
307 }
308 let meta = parse_file_info(&resp.files[0]);
309 Ok(RpStat::new(meta))
310 }
311 _ => Err(parse_error(resp)),
312 }
313 }
314
315 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
316 let resp = self
317 .core
318 .download_file_by_name(path, args.range(), &args)
319 .await?;
320
321 let status = resp.status();
322 match status {
323 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
324 Ok((RpRead::default(), resp.into_body()))
325 }
326 _ => {
327 let (part, mut body) = resp.into_parts();
328 let buf = body.to_buffer().await?;
329 Err(parse_error(Response::from_parts(part, buf)))
330 }
331 }
332 }
333
334 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
335 let concurrent = args.concurrent();
336 let writer = B2Writer::new(self.core.clone(), path, args);
337
338 let w = oio::MultipartWriter::new(self.core.info.clone(), writer, concurrent);
339
340 Ok((RpWrite::default(), w))
341 }
342
343 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
344 Ok((
345 RpDelete::default(),
346 oio::OneShotDeleter::new(B2Deleter::new(self.core.clone())),
347 ))
348 }
349
350 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
351 Ok((
352 RpList::default(),
353 oio::PageLister::new(B2Lister::new(
354 self.core.clone(),
355 path,
356 args.recursive(),
357 args.limit(),
358 args.start_after(),
359 )),
360 ))
361 }
362
363 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
364 let resp = self
365 .core
366 .list_file_names(Some(from), None, None, None)
367 .await?;
368
369 let status = resp.status();
370
371 let source_file_id = match status {
372 StatusCode::OK => {
373 let bs = resp.into_body();
374
375 let resp: ListFileNamesResponse =
376 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
377 if resp.files.is_empty() {
378 return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
379 }
380
381 let file_id = resp.files[0].clone().file_id;
382 Ok(file_id)
383 }
384 _ => Err(parse_error(resp)),
385 }?;
386
387 let Some(source_file_id) = source_file_id else {
388 return Err(Error::new(ErrorKind::IsADirectory, "is a directory"));
389 };
390
391 let resp = self.core.copy_file(source_file_id, to).await?;
392
393 let status = resp.status();
394
395 match status {
396 StatusCode::OK => Ok(RpCopy::default()),
397 _ => Err(parse_error(resp)),
398 }
399 }
400
401 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
402 match args.operation() {
403 PresignOperation::Stat(_) => {
404 let resp = self
405 .core
406 .get_download_authorization(path, args.expire())
407 .await?;
408 let path = build_abs_path(&self.core.root, path);
409
410 let auth_info = self.core.get_auth_info().await?;
411
412 let url = format!(
413 "{}/file/{}/{}?Authorization={}",
414 auth_info.download_url, self.core.bucket, path, resp.authorization_token
415 );
416
417 let req = Request::get(url);
418
419 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
420
421 let (parts, _) = req.into_parts();
423
424 Ok(RpPresign::new(PresignedRequest::new(
425 parts.method,
426 parts.uri,
427 parts.headers,
428 )))
429 }
430 PresignOperation::Read(_) => {
431 let resp = self
432 .core
433 .get_download_authorization(path, args.expire())
434 .await?;
435 let path = build_abs_path(&self.core.root, path);
436
437 let auth_info = self.core.get_auth_info().await?;
438
439 let url = format!(
440 "{}/file/{}/{}?Authorization={}",
441 auth_info.download_url, self.core.bucket, path, resp.authorization_token
442 );
443
444 let req = Request::get(url);
445
446 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
447
448 let (parts, _) = req.into_parts();
450
451 Ok(RpPresign::new(PresignedRequest::new(
452 parts.method,
453 parts.uri,
454 parts.headers,
455 )))
456 }
457 PresignOperation::Write(_) => {
458 let resp = self.core.get_upload_url().await?;
459
460 let mut req = Request::post(&resp.upload_url);
461
462 req = req.header(http::header::AUTHORIZATION, resp.authorization_token);
463 req = req.header("X-Bz-File-Name", build_abs_path(&self.core.root, path));
464 req = req.header(http::header::CONTENT_TYPE, "b2/x-auto");
465 req = req.header(constants::X_BZ_CONTENT_SHA1, "do_not_verify");
466
467 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
468 let (parts, _) = req.into_parts();
470
471 Ok(RpPresign::new(PresignedRequest::new(
472 parts.method,
473 parts.uri,
474 parts.headers,
475 )))
476 }
477 PresignOperation::Delete(_) => Err(Error::new(
478 ErrorKind::Unsupported,
479 "operation is not supported",
480 )),
481 }
482 }
483}