opendal/services/hdfs_native/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use hdfs_native::client::ListStatusIterator;
19
20use crate::raw::build_rel_path;
21use crate::raw::oio;
22use crate::raw::parse_datetime_from_from_timestamp_millis;
23use crate::services::hdfs_native::error::parse_hdfs_error;
24use crate::EntryMode;
25use crate::Metadata;
26use crate::Result;
27
28pub struct HdfsNativeLister {
29    root: String,
30    iter: ListStatusIterator,
31    current_path: Option<String>,
32    iter_to_end: bool,
33}
34
35impl HdfsNativeLister {
36    pub fn new(
37        root: &str,
38        client: &hdfs_native::Client,
39        abs_path: &str,
40        current_path: Option<String>,
41    ) -> Self {
42        HdfsNativeLister {
43            root: root.to_string(),
44            iter: client.list_status_iter(abs_path, false),
45            current_path,
46            iter_to_end: false,
47        }
48    }
49}
50
51impl oio::List for HdfsNativeLister {
52    async fn next(&mut self) -> Result<Option<oio::Entry>> {
53        if self.iter_to_end {
54            return Ok(None);
55        }
56
57        if let Some(path) = self.current_path.take() {
58            return Ok(Some(oio::Entry::new(&path, Metadata::new(EntryMode::DIR))));
59        }
60
61        match self.iter.next().await {
62            Some(Ok(status)) => {
63                let path = build_rel_path(&self.root, &status.path);
64
65                let entry = if status.isdir {
66                    oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
67                } else {
68                    let meta = Metadata::new(EntryMode::FILE)
69                        .with_content_length(status.length as u64)
70                        .with_last_modified(parse_datetime_from_from_timestamp_millis(
71                            status.modification_time as i64,
72                        )?);
73                    oio::Entry::new(&path, meta)
74                };
75
76                Ok(Some(entry))
77            }
78            Some(Err(e)) => Err(parse_hdfs_error(e)),
79            None => {
80                self.iter_to_end = true;
81                Ok(None)
82            }
83        }
84    }
85}