opendal/services/hdfs_native/
lister.rs1use hdfs_native::client::ListStatusIterator;
19
20use crate::raw::build_rel_path;
21use crate::raw::oio;
22use crate::raw::parse_datetime_from_from_timestamp_millis;
23use crate::services::hdfs_native::error::parse_hdfs_error;
24use crate::EntryMode;
25use crate::Metadata;
26use crate::Result;
27
28pub struct HdfsNativeLister {
29 root: String,
30 iter: ListStatusIterator,
31 current_path: Option<String>,
32 iter_to_end: bool,
33}
34
35impl HdfsNativeLister {
36 pub fn new(
37 root: &str,
38 client: &hdfs_native::Client,
39 abs_path: &str,
40 current_path: Option<String>,
41 ) -> Self {
42 HdfsNativeLister {
43 root: root.to_string(),
44 iter: client.list_status_iter(abs_path, false),
45 current_path,
46 iter_to_end: false,
47 }
48 }
49}
50
51impl oio::List for HdfsNativeLister {
52 async fn next(&mut self) -> Result<Option<oio::Entry>> {
53 if self.iter_to_end {
54 return Ok(None);
55 }
56
57 if let Some(path) = self.current_path.take() {
58 return Ok(Some(oio::Entry::new(&path, Metadata::new(EntryMode::DIR))));
59 }
60
61 match self.iter.next().await {
62 Some(Ok(status)) => {
63 let path = build_rel_path(&self.root, &status.path);
64
65 let entry = if status.isdir {
66 oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
67 } else {
68 let meta = Metadata::new(EntryMode::FILE)
69 .with_content_length(status.length as u64)
70 .with_last_modified(parse_datetime_from_from_timestamp_millis(
71 status.modification_time as i64,
72 )?);
73 oio::Entry::new(&path, meta)
74 };
75
76 Ok(Some(entry))
77 }
78 Some(Err(e)) => Err(parse_hdfs_error(e)),
79 None => {
80 self.iter_to_end = true;
81 Ok(None)
82 }
83 }
84 }
85}