opendal/services/cloudflare_kv/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use super::core::CloudflareKvCore;
23use super::error::parse_error;
24use crate::raw::*;
25use crate::services::cloudflare_kv::model::{CfKvListKey, CfKvListResponse};
26use crate::*;
27
28pub struct CloudflareKvLister {
29    core: Arc<CloudflareKvCore>,
30
31    path: String,
32    limit: Option<usize>,
33    recursive: bool,
34}
35
36impl CloudflareKvLister {
37    pub fn new(
38        core: Arc<CloudflareKvCore>,
39        path: &str,
40        recursive: bool,
41        limit: Option<usize>,
42    ) -> Self {
43        Self {
44            core,
45
46            path: path.to_string(),
47            limit,
48            recursive,
49        }
50    }
51
52    fn build_entry_for_item(&self, item: &CfKvListKey, root: &str) -> Result<oio::Entry> {
53        let metadata = item.metadata.clone();
54        let mut name = item.name.clone();
55
56        if metadata.is_dir && !name.ends_with('/') {
57            name += "/";
58        }
59
60        let mut name = name.replace(root.trim_start_matches('/'), "");
61
62        // If it is the root directory, it needs to be processed as /
63        if name.is_empty() {
64            name = "/".to_string();
65        }
66
67        let entry_metadata = if name.ends_with('/') {
68            Metadata::new(EntryMode::DIR)
69                .with_etag(build_tmp_path_of(&name))
70                .with_content_length(0)
71        } else {
72            Metadata::new(EntryMode::FILE)
73                .with_etag(metadata.etag)
74                .with_content_length(metadata.content_length as u64)
75                .with_last_modified(parse_datetime_from_rfc3339(&metadata.last_modified)?)
76        };
77
78        Ok(oio::Entry::new(&name, entry_metadata))
79    }
80
81    fn handle_non_recursive_file_list(
82        &self,
83        ctx: &mut oio::PageContext,
84        result: &[CfKvListKey],
85        root: &str,
86    ) -> Result<()> {
87        if let Some(item) = result.iter().find(|item| item.name == self.path) {
88            let entry = self.build_entry_for_item(item, root)?;
89            ctx.entries.push_back(entry);
90        } else if !result.is_empty() {
91            let path_name = self.path.replace(root.trim_start_matches('/'), "");
92            let entry = oio::Entry::new(
93                &format!("{path_name}/"),
94                Metadata::new(EntryMode::DIR)
95                    .with_etag(build_tmp_path_of(&path_name))
96                    .with_content_length(0),
97            );
98            ctx.entries.push_back(entry);
99        }
100        ctx.done = true;
101        Ok(())
102    }
103}
104
105impl oio::PageList for CloudflareKvLister {
106    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
107        let new_path = self.path.trim_end_matches('/');
108        let resp = self
109            .core
110            .list(new_path, self.limit, Some(ctx.token.clone()))
111            .await?;
112
113        if resp.status() != http::StatusCode::OK {
114            return Err(parse_error(resp));
115        }
116
117        let bs = resp.into_body();
118        let res: CfKvListResponse =
119            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
120
121        if !res.success {
122            return Err(Error::new(
123                ErrorKind::Unexpected,
124                "oss list this key failed for reason we don't know",
125            ));
126        }
127
128        let (token, done) = res
129            .result_info
130            .and_then(|info| info.cursor)
131            .map_or((String::new(), true), |cursor| {
132                (cursor.clone(), cursor.is_empty())
133            });
134
135        ctx.token = token;
136        ctx.done = done;
137
138        if let Some(result) = res.result {
139            let root = self.core.info.root().to_string();
140
141            if !self.path.ends_with('/') && !self.recursive {
142                self.handle_non_recursive_file_list(ctx, &result, &root)?;
143                return Ok(());
144            }
145
146            for item in result {
147                let mut name = item.name.clone();
148                if item.metadata.is_dir && !name.ends_with('/') {
149                    name += "/";
150                }
151
152                // For non-recursive listing, filter out entries not in the current directory.
153                if !self.recursive {
154                    if let Some(relative_path) = name.strip_prefix(&self.path) {
155                        if relative_path.trim_end_matches('/').contains('/') {
156                            continue;
157                        }
158                    } else if self.path != name {
159                        continue;
160                    }
161                }
162
163                let entry = self.build_entry_for_item(&item, &root)?;
164                ctx.entries.push_back(entry);
165            }
166        }
167
168        Ok(())
169    }
170}