opendal/services/b2/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use super::core::parse_file_info;
23use super::core::B2Core;
24use super::core::ListFileNamesResponse;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub struct B2Lister {
30    core: Arc<B2Core>,
31
32    path: String,
33    delimiter: Option<&'static str>,
34    limit: Option<usize>,
35
36    /// B2 starts listing **after** this specified key
37    start_after: Option<String>,
38}
39
40impl B2Lister {
41    pub fn new(
42        core: Arc<B2Core>,
43        path: &str,
44        recursive: bool,
45        limit: Option<usize>,
46        start_after: Option<&str>,
47    ) -> Self {
48        let delimiter = if recursive { None } else { Some("/") };
49        Self {
50            core,
51
52            path: path.to_string(),
53            delimiter,
54            limit,
55            start_after: start_after.map(String::from),
56        }
57    }
58}
59
60impl oio::PageList for B2Lister {
61    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
62        let resp = self
63            .core
64            .list_file_names(
65                Some(&self.path),
66                self.delimiter,
67                self.limit,
68                if ctx.token.is_empty() {
69                    self.start_after
70                        .as_ref()
71                        .map(|v| build_abs_path(&self.core.root, v))
72                } else {
73                    Some(ctx.token.clone())
74                },
75            )
76            .await?;
77
78        if resp.status() != http::StatusCode::OK {
79            return Err(parse_error(resp));
80        }
81
82        let bs = resp.into_body();
83
84        let output: ListFileNamesResponse =
85            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
86
87        if let Some(token) = output.next_file_name {
88            ctx.token = token;
89        } else {
90            ctx.done = true;
91        }
92
93        for file in output.files {
94            if let Some(start_after) = self.start_after.clone() {
95                if build_abs_path(&self.core.root, &start_after) == file.file_name {
96                    continue;
97                }
98            }
99            let file_name = file.file_name.clone();
100            let metadata = parse_file_info(&file);
101
102            ctx.entries.push_back(oio::Entry::new(
103                &build_rel_path(&self.core.root, &file_name),
104                metadata,
105            ))
106        }
107
108        Ok(())
109    }
110}