opendal/services/b2/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use log::debug;
27use tokio::sync::RwLock;
28
29use super::core::constants;
30use super::core::parse_file_info;
31use super::core::B2Core;
32use super::core::B2Signer;
33use super::core::ListFileNamesResponse;
34use super::delete::B2Deleter;
35use super::error::parse_error;
36use super::lister::B2Lister;
37use super::writer::B2Writer;
38use super::writer::B2Writers;
39use crate::raw::*;
40use crate::services::B2Config;
41use crate::*;
42
43impl Configurator for B2Config {
44    type Builder = B2Builder;
45
46    #[allow(deprecated)]
47    fn into_builder(self) -> Self::Builder {
48        B2Builder {
49            config: self,
50            http_client: None,
51        }
52    }
53}
54
55/// [b2](https://www.backblaze.com/cloud-storage) services support.
56#[doc = include_str!("docs.md")]
57#[derive(Default)]
58pub struct B2Builder {
59    config: B2Config,
60
61    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
62    http_client: Option<HttpClient>,
63}
64
65impl Debug for B2Builder {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        let mut d = f.debug_struct("B2Builder");
68
69        d.field("config", &self.config);
70        d.finish_non_exhaustive()
71    }
72}
73
74impl B2Builder {
75    /// Set root of this backend.
76    ///
77    /// All operations will happen under this root.
78    pub fn root(mut self, root: &str) -> Self {
79        self.config.root = if root.is_empty() {
80            None
81        } else {
82            Some(root.to_string())
83        };
84
85        self
86    }
87
88    /// application_key_id of this backend.
89    pub fn application_key_id(mut self, application_key_id: &str) -> Self {
90        self.config.application_key_id = if application_key_id.is_empty() {
91            None
92        } else {
93            Some(application_key_id.to_string())
94        };
95
96        self
97    }
98
99    /// application_key of this backend.
100    pub fn application_key(mut self, application_key: &str) -> Self {
101        self.config.application_key = if application_key.is_empty() {
102            None
103        } else {
104            Some(application_key.to_string())
105        };
106
107        self
108    }
109
110    /// Set bucket name of this backend.
111    /// You can find it in <https://secure.backblaze.com/b2_buckets.html>
112    pub fn bucket(mut self, bucket: &str) -> Self {
113        self.config.bucket = bucket.to_string();
114
115        self
116    }
117
118    /// Set bucket id of this backend.
119    /// You can find it in <https://secure.backblaze.com/b2_buckets.html>
120    pub fn bucket_id(mut self, bucket_id: &str) -> Self {
121        self.config.bucket_id = bucket_id.to_string();
122
123        self
124    }
125
126    /// Specify the http client that used by this service.
127    ///
128    /// # Notes
129    ///
130    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
131    /// during minor updates.
132    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
133    #[allow(deprecated)]
134    pub fn http_client(mut self, client: HttpClient) -> Self {
135        self.http_client = Some(client);
136        self
137    }
138}
139
140impl Builder for B2Builder {
141    const SCHEME: Scheme = Scheme::B2;
142    type Config = B2Config;
143
144    /// Builds the backend and returns the result of B2Backend.
145    fn build(self) -> Result<impl Access> {
146        debug!("backend build started: {:?}", &self);
147
148        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
149        debug!("backend use root {}", &root);
150
151        // Handle bucket.
152        if self.config.bucket.is_empty() {
153            return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
154                .with_operation("Builder::build")
155                .with_context("service", Scheme::B2));
156        }
157
158        debug!("backend use bucket {}", &self.config.bucket);
159
160        // Handle bucket_id.
161        if self.config.bucket_id.is_empty() {
162            return Err(Error::new(ErrorKind::ConfigInvalid, "bucket_id is empty")
163                .with_operation("Builder::build")
164                .with_context("service", Scheme::B2));
165        }
166
167        debug!("backend bucket_id {}", &self.config.bucket_id);
168
169        let application_key_id = match &self.config.application_key_id {
170            Some(application_key_id) => Ok(application_key_id.clone()),
171            None => Err(
172                Error::new(ErrorKind::ConfigInvalid, "application_key_id is empty")
173                    .with_operation("Builder::build")
174                    .with_context("service", Scheme::B2),
175            ),
176        }?;
177
178        let application_key = match &self.config.application_key {
179            Some(key_id) => Ok(key_id.clone()),
180            None => Err(
181                Error::new(ErrorKind::ConfigInvalid, "application_key is empty")
182                    .with_operation("Builder::build")
183                    .with_context("service", Scheme::B2),
184            ),
185        }?;
186
187        let signer = B2Signer {
188            application_key_id,
189            application_key,
190            ..Default::default()
191        };
192
193        Ok(B2Backend {
194            core: Arc::new(B2Core {
195                info: {
196                    let am = AccessorInfo::default();
197                    am.set_scheme(Scheme::B2)
198                        .set_root(&root)
199                        .set_native_capability(Capability {
200                            stat: true,
201                            stat_has_content_length: true,
202                            stat_has_content_md5: true,
203                            stat_has_content_type: true,
204
205                            read: true,
206
207                            write: true,
208                            write_can_empty: true,
209                            write_can_multi: true,
210                            write_with_content_type: true,
211                            // The min multipart size of b2 is 5 MiB.
212                            //
213                            // ref: <https://www.backblaze.com/docs/cloud-storage-large-files>
214                            write_multi_min_size: Some(5 * 1024 * 1024),
215                            // The max multipart size of b2 is 5 Gb.
216                            //
217                            // ref: <https://www.backblaze.com/docs/cloud-storage-large-files>
218                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
219                                Some(5 * 1024 * 1024 * 1024)
220                            } else {
221                                Some(usize::MAX)
222                            },
223
224                            delete: true,
225                            copy: true,
226
227                            list: true,
228                            list_with_limit: true,
229                            list_with_start_after: true,
230                            list_with_recursive: true,
231                            list_has_content_length: true,
232                            list_has_content_md5: true,
233                            list_has_content_type: true,
234
235                            presign: true,
236                            presign_read: true,
237                            presign_write: true,
238                            presign_stat: true,
239
240                            shared: true,
241
242                            ..Default::default()
243                        });
244
245                    // allow deprecated api here for compatibility
246                    #[allow(deprecated)]
247                    if let Some(client) = self.http_client {
248                        am.update_http_client(|_| client);
249                    }
250
251                    am.into()
252                },
253                signer: Arc::new(RwLock::new(signer)),
254                root,
255
256                bucket: self.config.bucket.clone(),
257                bucket_id: self.config.bucket_id.clone(),
258            }),
259        })
260    }
261}
262
263/// Backend for b2 services.
264#[derive(Debug, Clone)]
265pub struct B2Backend {
266    core: Arc<B2Core>,
267}
268
269impl Access for B2Backend {
270    type Reader = HttpBody;
271    type Writer = B2Writers;
272    type Lister = oio::PageLister<B2Lister>;
273    type Deleter = oio::OneShotDeleter<B2Deleter>;
274    type BlockingReader = ();
275    type BlockingWriter = ();
276    type BlockingLister = ();
277    type BlockingDeleter = ();
278
279    fn info(&self) -> Arc<AccessorInfo> {
280        self.core.info.clone()
281    }
282
283    /// B2 have a get_file_info api required a file_id field, but field_id need call list api, list api also return file info
284    /// So we call list api to get file info
285    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
286        // Stat root always returns a DIR.
287        if path == "/" {
288            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
289        }
290
291        let delimiter = if path.ends_with('/') { Some("/") } else { None };
292        let resp = self
293            .core
294            .list_file_names(Some(path), delimiter, None, None)
295            .await?;
296
297        let status = resp.status();
298
299        match status {
300            StatusCode::OK => {
301                let bs = resp.into_body();
302
303                let resp: ListFileNamesResponse =
304                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
305                if resp.files.is_empty() {
306                    return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
307                }
308                let meta = parse_file_info(&resp.files[0]);
309                Ok(RpStat::new(meta))
310            }
311            _ => Err(parse_error(resp)),
312        }
313    }
314
315    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
316        let resp = self
317            .core
318            .download_file_by_name(path, args.range(), &args)
319            .await?;
320
321        let status = resp.status();
322        match status {
323            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
324                Ok((RpRead::default(), resp.into_body()))
325            }
326            _ => {
327                let (part, mut body) = resp.into_parts();
328                let buf = body.to_buffer().await?;
329                Err(parse_error(Response::from_parts(part, buf)))
330            }
331        }
332    }
333
334    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
335        let concurrent = args.concurrent();
336        let writer = B2Writer::new(self.core.clone(), path, args);
337
338        let w = oio::MultipartWriter::new(self.core.info.clone(), writer, concurrent);
339
340        Ok((RpWrite::default(), w))
341    }
342
343    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
344        Ok((
345            RpDelete::default(),
346            oio::OneShotDeleter::new(B2Deleter::new(self.core.clone())),
347        ))
348    }
349
350    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
351        Ok((
352            RpList::default(),
353            oio::PageLister::new(B2Lister::new(
354                self.core.clone(),
355                path,
356                args.recursive(),
357                args.limit(),
358                args.start_after(),
359            )),
360        ))
361    }
362
363    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
364        let resp = self
365            .core
366            .list_file_names(Some(from), None, None, None)
367            .await?;
368
369        let status = resp.status();
370
371        let source_file_id = match status {
372            StatusCode::OK => {
373                let bs = resp.into_body();
374
375                let resp: ListFileNamesResponse =
376                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
377                if resp.files.is_empty() {
378                    return Err(Error::new(ErrorKind::NotFound, "no such file or directory"));
379                }
380
381                let file_id = resp.files[0].clone().file_id;
382                Ok(file_id)
383            }
384            _ => Err(parse_error(resp)),
385        }?;
386
387        let Some(source_file_id) = source_file_id else {
388            return Err(Error::new(ErrorKind::IsADirectory, "is a directory"));
389        };
390
391        let resp = self.core.copy_file(source_file_id, to).await?;
392
393        let status = resp.status();
394
395        match status {
396            StatusCode::OK => Ok(RpCopy::default()),
397            _ => Err(parse_error(resp)),
398        }
399    }
400
401    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
402        match args.operation() {
403            PresignOperation::Stat(_) => {
404                let resp = self
405                    .core
406                    .get_download_authorization(path, args.expire())
407                    .await?;
408                let path = build_abs_path(&self.core.root, path);
409
410                let auth_info = self.core.get_auth_info().await?;
411
412                let url = format!(
413                    "{}/file/{}/{}?Authorization={}",
414                    auth_info.download_url, self.core.bucket, path, resp.authorization_token
415                );
416
417                let req = Request::get(url);
418
419                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
420
421                // We don't need this request anymore, consume
422                let (parts, _) = req.into_parts();
423
424                Ok(RpPresign::new(PresignedRequest::new(
425                    parts.method,
426                    parts.uri,
427                    parts.headers,
428                )))
429            }
430            PresignOperation::Read(_) => {
431                let resp = self
432                    .core
433                    .get_download_authorization(path, args.expire())
434                    .await?;
435                let path = build_abs_path(&self.core.root, path);
436
437                let auth_info = self.core.get_auth_info().await?;
438
439                let url = format!(
440                    "{}/file/{}/{}?Authorization={}",
441                    auth_info.download_url, self.core.bucket, path, resp.authorization_token
442                );
443
444                let req = Request::get(url);
445
446                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
447
448                // We don't need this request anymore, consume
449                let (parts, _) = req.into_parts();
450
451                Ok(RpPresign::new(PresignedRequest::new(
452                    parts.method,
453                    parts.uri,
454                    parts.headers,
455                )))
456            }
457            PresignOperation::Write(_) => {
458                let resp = self.core.get_upload_url().await?;
459
460                let mut req = Request::post(&resp.upload_url);
461
462                req = req.header(http::header::AUTHORIZATION, resp.authorization_token);
463                req = req.header("X-Bz-File-Name", build_abs_path(&self.core.root, path));
464                req = req.header(http::header::CONTENT_TYPE, "b2/x-auto");
465                req = req.header(constants::X_BZ_CONTENT_SHA1, "do_not_verify");
466
467                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
468                // We don't need this request anymore, consume it directly.
469                let (parts, _) = req.into_parts();
470
471                Ok(RpPresign::new(PresignedRequest::new(
472                    parts.method,
473                    parts.uri,
474                    parts.headers,
475                )))
476            }
477            PresignOperation::Delete(_) => Err(Error::new(
478                ErrorKind::Unsupported,
479                "operation is not supported",
480            )),
481        }
482    }
483}