opendal/services/cloudflare_kv/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21
22use super::core::CloudflareKvCore;
23use super::error::parse_error;
24use crate::raw::*;
25use crate::services::cloudflare_kv::model::{CfKvListKey, CfKvListResponse};
26use crate::*;
27
28pub struct CloudflareKvLister {
29 core: Arc<CloudflareKvCore>,
30
31 path: String,
32 limit: Option<usize>,
33 recursive: bool,
34}
35
36impl CloudflareKvLister {
37 pub fn new(
38 core: Arc<CloudflareKvCore>,
39 path: &str,
40 recursive: bool,
41 limit: Option<usize>,
42 ) -> Self {
43 Self {
44 core,
45
46 path: path.to_string(),
47 limit,
48 recursive,
49 }
50 }
51
52 fn build_entry_for_item(&self, item: &CfKvListKey, root: &str) -> Result<oio::Entry> {
53 let metadata = item.metadata.clone();
54 let mut name = item.name.clone();
55
56 if metadata.is_dir && !name.ends_with('/') {
57 name += "/";
58 }
59
60 let mut name = name.replace(root.trim_start_matches('/'), "");
61
62 if name.is_empty() {
64 name = "/".to_string();
65 }
66
67 let entry_metadata = if name.ends_with('/') {
68 Metadata::new(EntryMode::DIR)
69 .with_etag(build_tmp_path_of(&name))
70 .with_content_length(0)
71 } else {
72 Metadata::new(EntryMode::FILE)
73 .with_etag(metadata.etag)
74 .with_content_length(metadata.content_length as u64)
75 .with_last_modified(parse_datetime_from_rfc3339(&metadata.last_modified)?)
76 };
77
78 Ok(oio::Entry::new(&name, entry_metadata))
79 }
80
81 fn handle_non_recursive_file_list(
82 &self,
83 ctx: &mut oio::PageContext,
84 result: &[CfKvListKey],
85 root: &str,
86 ) -> Result<()> {
87 if let Some(item) = result.iter().find(|item| item.name == self.path) {
88 let entry = self.build_entry_for_item(item, root)?;
89 ctx.entries.push_back(entry);
90 } else if !result.is_empty() {
91 let path_name = self.path.replace(root.trim_start_matches('/'), "");
92 let entry = oio::Entry::new(
93 &format!("{path_name}/"),
94 Metadata::new(EntryMode::DIR)
95 .with_etag(build_tmp_path_of(&path_name))
96 .with_content_length(0),
97 );
98 ctx.entries.push_back(entry);
99 }
100 ctx.done = true;
101 Ok(())
102 }
103}
104
105impl oio::PageList for CloudflareKvLister {
106 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
107 let new_path = self.path.trim_end_matches('/');
108 let resp = self
109 .core
110 .list(new_path, self.limit, Some(ctx.token.clone()))
111 .await?;
112
113 if resp.status() != http::StatusCode::OK {
114 return Err(parse_error(resp));
115 }
116
117 let bs = resp.into_body();
118 let res: CfKvListResponse =
119 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
120
121 if !res.success {
122 return Err(Error::new(
123 ErrorKind::Unexpected,
124 "oss list this key failed for reason we don't know",
125 ));
126 }
127
128 let (token, done) = res
129 .result_info
130 .and_then(|info| info.cursor)
131 .map_or((String::new(), true), |cursor| {
132 (cursor.clone(), cursor.is_empty())
133 });
134
135 ctx.token = token;
136 ctx.done = done;
137
138 if let Some(result) = res.result {
139 let root = self.core.info.root().to_string();
140
141 if !self.path.ends_with('/') && !self.recursive {
142 self.handle_non_recursive_file_list(ctx, &result, &root)?;
143 return Ok(());
144 }
145
146 for item in result {
147 let mut name = item.name.clone();
148 if item.metadata.is_dir && !name.ends_with('/') {
149 name += "/";
150 }
151
152 if !self.recursive {
154 if let Some(relative_path) = name.strip_prefix(&self.path) {
155 if relative_path.trim_end_matches('/').contains('/') {
156 continue;
157 }
158 } else if self.path != name {
159 continue;
160 }
161 }
162
163 let entry = self.build_entry_for_item(&item, &root)?;
164 ctx.entries.push_back(entry);
165 }
166 }
167
168 Ok(())
169 }
170}