opendal/services/lakefs/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use chrono::TimeZone;
22use chrono::Utc;
23
24use super::core::LakefsCore;
25use super::core::LakefsListResponse;
26use super::error::parse_error;
27use crate::raw::*;
28use crate::*;
29
30pub struct LakefsLister {
31 core: Arc<LakefsCore>,
32 path: String,
33 delimiter: &'static str,
34 amount: Option<usize>,
35 after: Option<String>,
36}
37
38impl LakefsLister {
39 pub fn new(
40 core: Arc<LakefsCore>,
41 path: String,
42 amount: Option<usize>,
43 after: Option<&str>,
44 recursive: bool,
45 ) -> Self {
46 let delimiter = if recursive { "" } else { "/" };
47 Self {
48 core,
49 path,
50 delimiter,
51 amount,
52 after: after.map(String::from),
53 }
54 }
55}
56
57impl oio::PageList for LakefsLister {
58 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
59 let response = self
60 .core
61 .list_objects(
62 &self.path,
63 self.delimiter,
64 &self.amount,
65 if ctx.token.is_empty() {
67 self.after.clone()
68 } else {
69 None
70 },
71 )
72 .await?;
73
74 let status_code = response.status();
75 if !status_code.is_success() {
76 let error = parse_error(response);
77 return Err(error);
78 }
79
80 let bytes = response.into_body();
81
82 let decoded_response: LakefsListResponse =
83 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
84
85 ctx.done = true;
86
87 for status in decoded_response.results {
88 let entry_type = match status.path_type.as_str() {
89 "common_prefix" => EntryMode::DIR,
90 "object" => EntryMode::FILE,
91 _ => EntryMode::Unknown,
92 };
93
94 let mut meta = Metadata::new(entry_type);
95
96 if status.mtime != 0 {
97 meta.set_last_modified(Utc.timestamp_opt(status.mtime, 0).unwrap());
98 }
99
100 if entry_type == EntryMode::FILE {
101 if let Some(size_bytes) = status.size_bytes {
102 meta.set_content_length(size_bytes);
103 }
104 }
105
106 let path = if entry_type == EntryMode::DIR {
107 format!("{}/", &status.path)
108 } else {
109 status.path.clone()
110 };
111
112 ctx.entries.push_back(oio::Entry::new(
113 &build_rel_path(&self.core.root, &path),
114 meta,
115 ));
116 }
117
118 Ok(())
119 }
120}