opendal/services/lakefs/
lister.rs1use std::sync::Arc;
19
20use super::core::LakefsCore;
21use super::core::LakefsListResponse;
22use super::error::parse_error;
23use crate::raw::*;
24use crate::*;
25use bytes::Buf;
26
27pub struct LakefsLister {
28 core: Arc<LakefsCore>,
29 path: String,
30 delimiter: &'static str,
31 amount: Option<usize>,
32 after: Option<String>,
33}
34
35impl LakefsLister {
36 pub fn new(
37 core: Arc<LakefsCore>,
38 path: String,
39 amount: Option<usize>,
40 after: Option<&str>,
41 recursive: bool,
42 ) -> Self {
43 let delimiter = if recursive { "" } else { "/" };
44 Self {
45 core,
46 path,
47 delimiter,
48 amount,
49 after: after.map(String::from),
50 }
51 }
52}
53
54impl oio::PageList for LakefsLister {
55 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
56 let response = self
57 .core
58 .list_objects(
59 &self.path,
60 self.delimiter,
61 &self.amount,
62 if ctx.token.is_empty() {
64 self.after.clone()
65 } else {
66 None
67 },
68 )
69 .await?;
70
71 let status_code = response.status();
72 if !status_code.is_success() {
73 let error = parse_error(response);
74 return Err(error);
75 }
76
77 let bytes = response.into_body();
78
79 let decoded_response: LakefsListResponse =
80 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
81
82 ctx.done = true;
83
84 for status in decoded_response.results {
85 let entry_type = match status.path_type.as_str() {
86 "common_prefix" => EntryMode::DIR,
87 "object" => EntryMode::FILE,
88 _ => EntryMode::Unknown,
89 };
90
91 let mut meta = Metadata::new(entry_type);
92
93 if status.mtime != 0 {
94 meta.set_last_modified(Timestamp::from_second(status.mtime).unwrap());
95 }
96
97 if entry_type == EntryMode::FILE {
98 if let Some(size_bytes) = status.size_bytes {
99 meta.set_content_length(size_bytes);
100 }
101 }
102
103 let path = if entry_type == EntryMode::DIR {
104 format!("{}/", &status.path)
105 } else {
106 status.path.clone()
107 };
108
109 ctx.entries.push_back(oio::Entry::new(
110 &build_rel_path(&self.core.root, &path),
111 meta,
112 ));
113 }
114
115 Ok(())
116 }
117}