opendal/services/webhdfs/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::WebhdfsCore;
24use super::error::parse_error;
25use super::message::*;
26use crate::raw::*;
27use crate::*;
28
29pub struct WebhdfsLister {
30 core: Arc<WebhdfsCore>,
31 path: String,
32}
33
34impl WebhdfsLister {
35 pub fn new(core: Arc<WebhdfsCore>, path: &str) -> Self {
36 Self {
37 core,
38 path: path.to_string(),
39 }
40 }
41}
42
43impl oio::PageList for WebhdfsLister {
44 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
45 let file_status = if self.core.disable_list_batch {
46 let resp = self.core.webhdfs_list_status_request(&self.path).await?;
47 match resp.status() {
48 StatusCode::OK => {
49 ctx.done = true;
50 ctx.entries.push_back(oio::Entry::new(
51 format!("{}/", self.path).as_str(),
52 Metadata::new(EntryMode::DIR),
53 ));
54
55 let bs = resp.into_body();
56 serde_json::from_reader::<_, FileStatusesWrapper>(bs.reader())
57 .map_err(new_json_deserialize_error)?
58 .file_statuses
59 .file_status
60 }
61 StatusCode::NOT_FOUND => {
62 ctx.done = true;
63 return Ok(());
64 }
65 _ => return Err(parse_error(resp)),
66 }
67 } else {
68 let resp = self
69 .core
70 .webhdfs_list_status_batch_request(&self.path, &ctx.token)
71 .await?;
72 match resp.status() {
73 StatusCode::OK => {
74 let bs = resp.into_body();
75 let res: DirectoryListingWrapper =
76 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
77 let directory_listing = res.directory_listing;
78 let file_statuses = directory_listing.partial_listing.file_statuses.file_status;
79
80 if directory_listing.remaining_entries == 0 {
81 ctx.entries.push_back(oio::Entry::new(
82 format!("{}/", self.path).as_str(),
83 Metadata::new(EntryMode::DIR),
84 ));
85
86 ctx.done = true;
87 } else if !file_statuses.is_empty() {
88 ctx.token
89 .clone_from(&file_statuses.last().unwrap().path_suffix);
90 }
91
92 file_statuses
93 }
94 StatusCode::NOT_FOUND => {
95 ctx.done = true;
96 return Ok(());
97 }
98 _ => return Err(parse_error(resp)),
99 }
100 };
101
102 for status in file_status {
103 let mut path = if self.path.is_empty() {
104 status.path_suffix.to_string()
105 } else {
106 format!("{}/{}", self.path, status.path_suffix)
107 };
108
109 let meta = match status.ty {
110 FileStatusType::Directory => Metadata::new(EntryMode::DIR),
111 FileStatusType::File => Metadata::new(EntryMode::FILE)
112 .with_content_length(status.length)
113 .with_last_modified(parse_datetime_from_from_timestamp_millis(
114 status.modification_time,
115 )?),
116 };
117
118 if meta.mode().is_file() {
119 path = path.trim_end_matches('/').to_string();
120 }
121 if meta.mode().is_dir() {
122 path += "/"
123 }
124 let entry = oio::Entry::new(&path, meta);
125 ctx.entries.push_back(entry);
126 }
127
128 Ok(())
129 }
130}