opendal/services/cos/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use quick_xml::de;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::PageContext;
26use crate::raw::*;
27use crate::EntryMode;
28use crate::Error;
29use crate::Metadata;
30use crate::Result;
31
32pub type CosListers = TwoWays<oio::PageLister<CosLister>, oio::PageLister<CosObjectVersionsLister>>;
33
34pub struct CosLister {
35 core: Arc<CosCore>,
36 path: String,
37 delimiter: &'static str,
38 limit: Option<usize>,
39}
40
41impl CosLister {
42 pub fn new(core: Arc<CosCore>, path: &str, recursive: bool, limit: Option<usize>) -> Self {
43 let delimiter = if recursive { "" } else { "/" };
44 Self {
45 core,
46 path: path.to_string(),
47 delimiter,
48 limit,
49 }
50 }
51}
52
53impl oio::PageList for CosLister {
54 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
55 let resp = self
56 .core
57 .cos_list_objects(&self.path, &ctx.token, self.delimiter, self.limit)
58 .await?;
59
60 if resp.status() != http::StatusCode::OK {
61 return Err(parse_error(resp));
62 }
63
64 let bs = resp.into_body();
65
66 let output: ListObjectsOutput =
67 de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
68
69 ctx.done = match output.next_marker.as_ref() {
73 None => true,
74 Some(next_marker) => next_marker.is_empty(),
75 };
76 ctx.token = output.next_marker.clone().unwrap_or_default();
77
78 for prefix in output.common_prefixes {
79 let de = oio::Entry::new(
80 &build_rel_path(&self.core.root, &prefix.prefix),
81 Metadata::new(EntryMode::DIR),
82 );
83
84 ctx.entries.push_back(de);
85 }
86
87 for object in output.contents {
88 let mut path = build_rel_path(&self.core.root, &object.key);
89 if path.is_empty() {
90 path = "/".to_string();
91 }
92
93 let meta = Metadata::new(EntryMode::from_path(&path)).with_content_length(object.size);
94
95 let de = oio::Entry::with(path, meta);
96 ctx.entries.push_back(de);
97 }
98
99 Ok(())
100 }
101}
102
103pub struct CosObjectVersionsLister {
105 core: Arc<CosCore>,
106
107 prefix: String,
108 args: OpList,
109
110 delimiter: &'static str,
111 abs_start_after: Option<String>,
112}
113
114impl CosObjectVersionsLister {
115 pub fn new(core: Arc<CosCore>, path: &str, args: OpList) -> Self {
116 let delimiter = if args.recursive() { "" } else { "/" };
117 let abs_start_after = args
118 .start_after()
119 .map(|start_after| build_abs_path(&core.root, start_after));
120
121 Self {
122 core,
123 prefix: path.to_string(),
124 args,
125 delimiter,
126 abs_start_after,
127 }
128 }
129}
130
131impl oio::PageList for CosObjectVersionsLister {
132 async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
133 let markers = ctx.token.rsplit_once(" ");
134 let (key_marker, version_id_marker) = if let Some(data) = markers {
135 data
136 } else if let Some(start_after) = &self.abs_start_after {
137 (start_after.as_str(), "")
138 } else {
139 ("", "")
140 };
141
142 let resp = self
143 .core
144 .cos_list_object_versions(
145 &self.prefix,
146 self.delimiter,
147 self.args.limit(),
148 key_marker,
149 version_id_marker,
150 )
151 .await?;
152 if resp.status() != http::StatusCode::OK {
153 return Err(parse_error(resp));
154 }
155
156 let body = resp.into_body();
157 let output: ListObjectVersionsOutput = de::from_reader(body.reader())
158 .map_err(new_xml_deserialize_error)
159 .map_err(Error::set_temporary)?;
165
166 ctx.done = if let Some(is_truncated) = output.is_truncated {
167 !is_truncated
168 } else {
169 false
170 };
171 ctx.token = format!(
172 "{} {}",
173 output.next_key_marker.unwrap_or_default(),
174 output.next_version_id_marker.unwrap_or_default()
175 );
176
177 for prefix in output.common_prefixes {
178 let de = oio::Entry::new(
179 &build_rel_path(&self.core.root, &prefix.prefix),
180 Metadata::new(EntryMode::DIR),
181 );
182 ctx.entries.push_back(de);
183 }
184
185 for version_object in output.version {
186 if !(self.args.versions() || version_object.is_latest) {
191 continue;
192 }
193
194 let mut path = build_rel_path(&self.core.root, &version_object.key);
195 if path.is_empty() {
196 path = "/".to_owned();
197 }
198
199 let mut meta = Metadata::new(EntryMode::from_path(&path));
200 meta.set_version(&version_object.version_id);
201 meta.set_is_current(version_object.is_latest);
202 meta.set_content_length(version_object.size);
203 meta.set_last_modified(parse_datetime_from_rfc3339(
204 version_object.last_modified.as_str(),
205 )?);
206 if let Some(etag) = version_object.etag {
207 meta.set_etag(&etag);
208 meta.set_content_md5(etag.trim_matches('"'));
209 }
210
211 let entry = oio::Entry::new(&path, meta);
212 ctx.entries.push_back(entry);
213 }
214
215 if self.args.deleted() {
216 for delete_marker in output.delete_marker {
217 let mut path = build_rel_path(&self.core.root, &delete_marker.key);
218 if path.is_empty() {
219 path = "/".to_owned();
220 }
221
222 let mut meta = Metadata::new(EntryMode::FILE);
223 meta.set_version(&delete_marker.version_id);
224 meta.set_is_deleted(true);
225 meta.set_is_current(delete_marker.is_latest);
226 meta.set_last_modified(parse_datetime_from_rfc3339(
227 delete_marker.last_modified.as_str(),
228 )?);
229
230 let entry = oio::Entry::new(&path, meta);
231 ctx.entries.push_back(entry);
232 }
233 }
234
235 Ok(())
236 }
237}