opendal/services/hdfs_native/
lister.rs1use crate::raw::build_rel_path;
19use crate::raw::oio;
20use crate::raw::parse_datetime_from_from_timestamp_millis;
21use crate::services::hdfs_native::error::parse_hdfs_error;
22use crate::EntryMode;
23use crate::Metadata;
24use crate::Result;
25use hdfs_native::client::ListStatusIterator;
26
27pub struct HdfsNativeLister {
28 root: String,
29 iter: ListStatusIterator,
30 current_path: Option<String>,
31 iter_to_end: bool,
32}
33
34impl HdfsNativeLister {
35 pub fn new(
36 root: &str,
37 client: &hdfs_native::Client,
38 abs_path: &str,
39 current_path: Option<String>,
40 ) -> Self {
41 HdfsNativeLister {
42 root: root.to_string(),
43 iter: client.list_status_iter(abs_path, false),
44 current_path,
45 iter_to_end: false,
46 }
47 }
48}
49
50impl oio::List for HdfsNativeLister {
51 async fn next(&mut self) -> Result<Option<oio::Entry>> {
52 if self.iter_to_end {
53 return Ok(None);
54 }
55
56 if let Some(path) = self.current_path.take() {
57 return Ok(Some(oio::Entry::new(&path, Metadata::new(EntryMode::DIR))));
58 }
59
60 match self.iter.next().await {
61 Some(Ok(status)) => {
62 let path = build_rel_path(&self.root, &status.path);
63
64 let entry = if status.isdir {
65 oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
66 } else {
67 let meta = Metadata::new(EntryMode::FILE)
68 .with_content_length(status.length as u64)
69 .with_last_modified(parse_datetime_from_from_timestamp_millis(
70 status.modification_time as i64,
71 )?);
72 oio::Entry::new(&path, meta)
73 };
74
75 Ok(Some(entry))
76 }
77 Some(Err(e)) => Err(parse_hdfs_error(e)),
78 None => {
79 self.iter_to_end = true;
80 Ok(None)
81 }
82 }
83 }
84}