opendal/services/etcd/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19use std::vec::IntoIter;
20
21use super::core::EtcdCore;
22use crate::raw::oio::Entry;
23use crate::raw::{build_abs_path, build_rel_path, oio};
24use crate::*;
25
26pub struct EtcdLister {
27    root: String,
28    path: String,
29    iter: IntoIter<String>,
30}
31
32impl EtcdLister {
33    pub async fn new(core: Arc<EtcdCore>, root: String, path: String) -> Result<Self> {
34        let abs_path = build_abs_path(&root, &path);
35
36        // Get all keys with the specified prefix
37        let mut client = core.conn().await?;
38        let get_options = Some(
39            etcd_client::GetOptions::new()
40                .with_prefix()
41                .with_keys_only(),
42        );
43        let resp = client
44            .get(abs_path.as_str(), get_options)
45            .await
46            .map_err(super::error::format_etcd_error)?;
47
48        // Collect all keys that match the prefix
49        let mut keys = Vec::new();
50        for kv in resp.kvs() {
51            let key = kv.key_str().map(String::from).map_err(|err| {
52                Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
53                    .set_source(err)
54            })?;
55            keys.push(key);
56        }
57
58        Ok(Self {
59            root,
60            path: abs_path,
61            iter: keys.into_iter(),
62        })
63    }
64}
65
66impl oio::List for EtcdLister {
67    async fn next(&mut self) -> Result<Option<Entry>> {
68        for key in self.iter.by_ref() {
69            if key.starts_with(&self.path) {
70                let path = build_rel_path(&self.root, &key);
71
72                let entry = Entry::new(&path, Metadata::new(EntryMode::from_path(&key)));
73                return Ok(Some(entry));
74            }
75        }
76
77        Ok(None)
78    }
79}