opendal/services/webhdfs/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::WebhdfsCore;
24use super::error::parse_error;
25use super::message::*;
26use crate::raw::*;
27use crate::*;
28
29pub struct WebhdfsLister {
30    core: Arc<WebhdfsCore>,
31    path: String,
32}
33
34impl WebhdfsLister {
35    pub fn new(core: Arc<WebhdfsCore>, path: &str) -> Self {
36        Self {
37            core,
38            path: path.to_string(),
39        }
40    }
41}
42
43impl oio::PageList for WebhdfsLister {
44    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
45        let file_status = if self.core.disable_list_batch {
46            let resp = self.core.webhdfs_list_status_request(&self.path).await?;
47            match resp.status() {
48                StatusCode::OK => {
49                    ctx.done = true;
50                    ctx.entries.push_back(oio::Entry::new(
51                        format!("{}/", self.path).as_str(),
52                        Metadata::new(EntryMode::DIR),
53                    ));
54
55                    let bs = resp.into_body();
56                    serde_json::from_reader::<_, FileStatusesWrapper>(bs.reader())
57                        .map_err(new_json_deserialize_error)?
58                        .file_statuses
59                        .file_status
60                }
61                StatusCode::NOT_FOUND => {
62                    ctx.done = true;
63                    return Ok(());
64                }
65                _ => return Err(parse_error(resp)),
66            }
67        } else {
68            let resp = self
69                .core
70                .webhdfs_list_status_batch_request(&self.path, &ctx.token)
71                .await?;
72            match resp.status() {
73                StatusCode::OK => {
74                    let bs = resp.into_body();
75                    let res: DirectoryListingWrapper =
76                        serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
77                    let directory_listing = res.directory_listing;
78                    let file_statuses = directory_listing.partial_listing.file_statuses.file_status;
79
80                    if directory_listing.remaining_entries == 0 {
81                        ctx.entries.push_back(oio::Entry::new(
82                            format!("{}/", self.path).as_str(),
83                            Metadata::new(EntryMode::DIR),
84                        ));
85
86                        ctx.done = true;
87                    } else if !file_statuses.is_empty() {
88                        ctx.token
89                            .clone_from(&file_statuses.last().unwrap().path_suffix);
90                    }
91
92                    file_statuses
93                }
94                StatusCode::NOT_FOUND => {
95                    ctx.done = true;
96                    return Ok(());
97                }
98                _ => return Err(parse_error(resp)),
99            }
100        };
101
102        for status in file_status {
103            let mut path = if self.path.is_empty() {
104                status.path_suffix.to_string()
105            } else {
106                format!("{}/{}", self.path, status.path_suffix)
107            };
108
109            let meta = match status.ty {
110                FileStatusType::Directory => Metadata::new(EntryMode::DIR),
111                FileStatusType::File => Metadata::new(EntryMode::FILE)
112                    .with_content_length(status.length)
113                    .with_last_modified(parse_datetime_from_from_timestamp_millis(
114                        status.modification_time,
115                    )?),
116            };
117
118            if meta.mode().is_file() {
119                path = path.trim_end_matches('/').to_string();
120            }
121            if meta.mode().is_dir() {
122                path += "/"
123            }
124            let entry = oio::Entry::new(&path, meta);
125            ctx.entries.push_back(entry);
126        }
127
128        Ok(())
129    }
130}