opendal/services/lakefs/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use chrono::TimeZone;
22use chrono::Utc;
23
24use super::core::LakefsCore;
25use super::core::LakefsListResponse;
26use super::error::parse_error;
27use crate::raw::*;
28use crate::*;
29
30pub struct LakefsLister {
31    core: Arc<LakefsCore>,
32    path: String,
33    delimiter: &'static str,
34    amount: Option<usize>,
35    after: Option<String>,
36}
37
38impl LakefsLister {
39    pub fn new(
40        core: Arc<LakefsCore>,
41        path: String,
42        amount: Option<usize>,
43        after: Option<&str>,
44        recursive: bool,
45    ) -> Self {
46        let delimiter = if recursive { "" } else { "/" };
47        Self {
48            core,
49            path,
50            delimiter,
51            amount,
52            after: after.map(String::from),
53        }
54    }
55}
56
57impl oio::PageList for LakefsLister {
58    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
59        let response = self
60            .core
61            .list_objects(
62                &self.path,
63                self.delimiter,
64                &self.amount,
65                // start after should only be set for the first page.
66                if ctx.token.is_empty() {
67                    self.after.clone()
68                } else {
69                    None
70                },
71            )
72            .await?;
73
74        let status_code = response.status();
75        if !status_code.is_success() {
76            let error = parse_error(response);
77            return Err(error);
78        }
79
80        let bytes = response.into_body();
81
82        let decoded_response: LakefsListResponse =
83            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
84
85        ctx.done = true;
86
87        for status in decoded_response.results {
88            let entry_type = match status.path_type.as_str() {
89                "common_prefix" => EntryMode::DIR,
90                "object" => EntryMode::FILE,
91                _ => EntryMode::Unknown,
92            };
93
94            let mut meta = Metadata::new(entry_type);
95
96            if status.mtime != 0 {
97                meta.set_last_modified(Utc.timestamp_opt(status.mtime, 0).unwrap());
98            }
99
100            if entry_type == EntryMode::FILE {
101                if let Some(size_bytes) = status.size_bytes {
102                    meta.set_content_length(size_bytes);
103                }
104            }
105
106            let path = if entry_type == EntryMode::DIR {
107                format!("{}/", &status.path)
108            } else {
109                status.path.clone()
110            };
111
112            ctx.entries.push_back(oio::Entry::new(
113                &build_rel_path(&self.core.root, &path),
114                meta,
115            ));
116        }
117
118        Ok(())
119    }
120}