opendal/services/hdfs_native/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::build_rel_path;
19use crate::raw::oio;
20use crate::raw::parse_datetime_from_from_timestamp_millis;
21use crate::services::hdfs_native::error::parse_hdfs_error;
22use crate::EntryMode;
23use crate::Metadata;
24use crate::Result;
25use hdfs_native::client::ListStatusIterator;
26
27pub struct HdfsNativeLister {
28    root: String,
29    iter: ListStatusIterator,
30    current_path: Option<String>,
31    iter_to_end: bool,
32}
33
34impl HdfsNativeLister {
35    pub fn new(
36        root: &str,
37        client: &hdfs_native::Client,
38        abs_path: &str,
39        current_path: Option<String>,
40    ) -> Self {
41        HdfsNativeLister {
42            root: root.to_string(),
43            iter: client.list_status_iter(abs_path, false),
44            current_path,
45            iter_to_end: false,
46        }
47    }
48}
49
50impl oio::List for HdfsNativeLister {
51    async fn next(&mut self) -> Result<Option<oio::Entry>> {
52        if self.iter_to_end {
53            return Ok(None);
54        }
55
56        if let Some(path) = self.current_path.take() {
57            return Ok(Some(oio::Entry::new(&path, Metadata::new(EntryMode::DIR))));
58        }
59
60        match self.iter.next().await {
61            Some(Ok(status)) => {
62                let path = build_rel_path(&self.root, &status.path);
63
64                let entry = if status.isdir {
65                    oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
66                } else {
67                    let meta = Metadata::new(EntryMode::FILE)
68                        .with_content_length(status.length as u64)
69                        .with_last_modified(parse_datetime_from_from_timestamp_millis(
70                            status.modification_time as i64,
71                        )?);
72                    oio::Entry::new(&path, meta)
73                };
74
75                Ok(Some(entry))
76            }
77            Some(Err(e)) => Err(parse_hdfs_error(e)),
78            None => {
79                self.iter_to_end = true;
80                Ok(None)
81            }
82        }
83    }
84}