opendal/services/lakefs/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::core::LakefsCore;
21use super::core::LakefsListResponse;
22use super::error::parse_error;
23use crate::raw::*;
24use crate::*;
25use bytes::Buf;
26
27pub struct LakefsLister {
28    core: Arc<LakefsCore>,
29    path: String,
30    delimiter: &'static str,
31    amount: Option<usize>,
32    after: Option<String>,
33}
34
35impl LakefsLister {
36    pub fn new(
37        core: Arc<LakefsCore>,
38        path: String,
39        amount: Option<usize>,
40        after: Option<&str>,
41        recursive: bool,
42    ) -> Self {
43        let delimiter = if recursive { "" } else { "/" };
44        Self {
45            core,
46            path,
47            delimiter,
48            amount,
49            after: after.map(String::from),
50        }
51    }
52}
53
54impl oio::PageList for LakefsLister {
55    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
56        let response = self
57            .core
58            .list_objects(
59                &self.path,
60                self.delimiter,
61                &self.amount,
62                // start after should only be set for the first page.
63                if ctx.token.is_empty() {
64                    self.after.clone()
65                } else {
66                    None
67                },
68            )
69            .await?;
70
71        let status_code = response.status();
72        if !status_code.is_success() {
73            let error = parse_error(response);
74            return Err(error);
75        }
76
77        let bytes = response.into_body();
78
79        let decoded_response: LakefsListResponse =
80            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
81
82        ctx.done = true;
83
84        for status in decoded_response.results {
85            let entry_type = match status.path_type.as_str() {
86                "common_prefix" => EntryMode::DIR,
87                "object" => EntryMode::FILE,
88                _ => EntryMode::Unknown,
89            };
90
91            let mut meta = Metadata::new(entry_type);
92
93            if status.mtime != 0 {
94                meta.set_last_modified(Timestamp::from_second(status.mtime).unwrap());
95            }
96
97            if entry_type == EntryMode::FILE {
98                if let Some(size_bytes) = status.size_bytes {
99                    meta.set_content_length(size_bytes);
100                }
101            }
102
103            let path = if entry_type == EntryMode::DIR {
104                format!("{}/", &status.path)
105            } else {
106                status.path.clone()
107            };
108
109            ctx.entries.push_back(oio::Entry::new(
110                &build_rel_path(&self.core.root, &path),
111                meta,
112            ));
113        }
114
115        Ok(())
116    }
117}