opendal/services/s3/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use quick_xml::de;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::PageContext;
26use crate::raw::*;
27use crate::EntryMode;
28use crate::Error;
29use crate::Metadata;
30use crate::Result;
31
32pub type S3Listers = ThreeWays<
33 oio::PageLister<S3ListerV1>,
34 oio::PageLister<S3ListerV2>,
35 oio::PageLister<S3ObjectVersionsLister>,
36>;
37
38pub struct S3ListerV1 {
40 core: Arc<S3Core>,
41
42 path: String,
43 args: OpList,
44
45 delimiter: &'static str,
46 first_marker: String,
50}
51
52impl S3ListerV1 {
53 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
54 let delimiter = if args.recursive() { "" } else { "/" };
55 let first_marker = args
56 .start_after()
57 .map(|start_after| build_abs_path(&core.root, start_after))
58 .unwrap_or_default();
59
60 Self {
61 core,
62
63 path: path.to_string(),
64 args,
65 delimiter,
66 first_marker,
67 }
68 }
69}
70
71impl oio::PageList for S3ListerV1 {
72 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
73 let resp = self
74 .core
75 .s3_list_objects_v1(
76 &self.path,
77 if !ctx.token.is_empty() {
79 &ctx.token
80 } else {
81 &self.first_marker
82 },
83 self.delimiter,
84 self.args.limit(),
85 )
86 .await?;
87
88 if resp.status() != http::StatusCode::OK {
89 return Err(parse_error(resp));
90 }
91 let bs = resp.into_body();
92
93 let output: ListObjectsOutputV1 = de::from_reader(bs.reader())
94 .map_err(new_xml_deserialize_error)
95 .map_err(Error::set_temporary)?;
101
102 ctx.done = if let Some(is_truncated) = output.is_truncated {
107 !is_truncated
108 } else {
109 output.common_prefixes.is_empty() && output.contents.is_empty()
110 };
111 ctx.token = if let Some(next_marker) = &output.next_marker {
117 next_marker.clone()
118 } else if let Some(content) = output.contents.last() {
119 content.key.clone()
120 } else if let Some(prefix) = output.common_prefixes.last() {
121 prefix.prefix.clone()
122 } else {
123 "".to_string()
124 };
125
126 for prefix in output.common_prefixes {
127 let de = oio::Entry::new(
128 &build_rel_path(&self.core.root, &prefix.prefix),
129 Metadata::new(EntryMode::DIR),
130 );
131
132 ctx.entries.push_back(de);
133 }
134
135 for object in output.contents {
136 let mut path = build_rel_path(&self.core.root, &object.key);
137 if path.is_empty() {
138 path = "/".to_string();
139 }
140
141 let mut meta = Metadata::new(EntryMode::from_path(&path));
142 meta.set_is_current(true);
143 if let Some(etag) = &object.etag {
144 meta.set_etag(etag);
145 meta.set_content_md5(etag.trim_matches('"'));
146 }
147 meta.set_content_length(object.size);
148
149 meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
152
153 let de = oio::Entry::with(path, meta);
154 ctx.entries.push_back(de);
155 }
156
157 Ok(())
158 }
159}
160
161pub struct S3ListerV2 {
163 core: Arc<S3Core>,
164
165 path: String,
166 args: OpList,
167
168 delimiter: &'static str,
169 abs_start_after: Option<String>,
170}
171
172impl S3ListerV2 {
173 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
174 let delimiter = if args.recursive() { "" } else { "/" };
175 let abs_start_after = args
176 .start_after()
177 .map(|start_after| build_abs_path(&core.root, start_after));
178
179 Self {
180 core,
181
182 path: path.to_string(),
183 args,
184 delimiter,
185 abs_start_after,
186 }
187 }
188}
189
190impl oio::PageList for S3ListerV2 {
191 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
192 let resp = self
193 .core
194 .s3_list_objects_v2(
195 &self.path,
196 &ctx.token,
197 self.delimiter,
198 self.args.limit(),
199 if ctx.token.is_empty() {
201 self.abs_start_after.clone()
202 } else {
203 None
204 },
205 )
206 .await?;
207
208 if resp.status() != http::StatusCode::OK {
209 return Err(parse_error(resp));
210 }
211 let bs = resp.into_body();
212
213 let output: ListObjectsOutputV2 = de::from_reader(bs.reader())
214 .map_err(new_xml_deserialize_error)
215 .map_err(Error::set_temporary)?;
221
222 ctx.done = if let Some(is_truncated) = output.is_truncated {
228 !is_truncated
229 } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
230 next_continuation_token.is_empty()
231 } else {
232 output.common_prefixes.is_empty() && output.contents.is_empty()
233 };
234 ctx.token = output.next_continuation_token.clone().unwrap_or_default();
235
236 for prefix in output.common_prefixes {
237 let de = oio::Entry::new(
238 &build_rel_path(&self.core.root, &prefix.prefix),
239 Metadata::new(EntryMode::DIR),
240 );
241
242 ctx.entries.push_back(de);
243 }
244
245 for object in output.contents {
246 let mut path = build_rel_path(&self.core.root, &object.key);
247 if path.is_empty() {
248 path = "/".to_string();
249 }
250
251 let mut meta = Metadata::new(EntryMode::from_path(&path));
252 meta.set_is_current(true);
253 if let Some(etag) = &object.etag {
254 meta.set_etag(etag);
255 meta.set_content_md5(etag.trim_matches('"'));
256 }
257 meta.set_content_length(object.size);
258
259 meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
262
263 let de = oio::Entry::with(path, meta);
264 ctx.entries.push_back(de);
265 }
266
267 Ok(())
268 }
269}
270
271pub struct S3ObjectVersionsLister {
273 core: Arc<S3Core>,
274
275 prefix: String,
276 args: OpList,
277
278 delimiter: &'static str,
279 abs_start_after: Option<String>,
280}
281
282impl S3ObjectVersionsLister {
283 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
284 let delimiter = if args.recursive() { "" } else { "/" };
285 let abs_start_after = args
286 .start_after()
287 .map(|start_after| build_abs_path(&core.root, start_after));
288
289 Self {
290 core,
291 prefix: path.to_string(),
292 args,
293 delimiter,
294 abs_start_after,
295 }
296 }
297}
298
299impl oio::PageList for S3ObjectVersionsLister {
300 async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
301 let markers = ctx.token.rsplit_once(" ");
302 let (key_marker, version_id_marker) = if let Some(data) = markers {
303 data
304 } else if let Some(start_after) = &self.abs_start_after {
305 (start_after.as_str(), "")
306 } else {
307 ("", "")
308 };
309
310 let resp = self
311 .core
312 .s3_list_object_versions(
313 &self.prefix,
314 self.delimiter,
315 self.args.limit(),
316 key_marker,
317 version_id_marker,
318 )
319 .await?;
320 if resp.status() != http::StatusCode::OK {
321 return Err(parse_error(resp));
322 }
323
324 let body = resp.into_body();
325 let output: ListObjectVersionsOutput = de::from_reader(body.reader())
326 .map_err(new_xml_deserialize_error)
327 .map_err(Error::set_temporary)?;
333
334 ctx.done = if let Some(is_truncated) = output.is_truncated {
335 !is_truncated
336 } else {
337 false
338 };
339 ctx.token = format!(
340 "{} {}",
341 output.next_key_marker.unwrap_or_default(),
342 output.next_version_id_marker.unwrap_or_default()
343 );
344
345 for prefix in output.common_prefixes {
346 let de = oio::Entry::new(
347 &build_rel_path(&self.core.root, &prefix.prefix),
348 Metadata::new(EntryMode::DIR),
349 );
350 ctx.entries.push_back(de);
351 }
352
353 for version_object in output.version {
354 if !(self.args.versions() || version_object.is_latest) {
359 continue;
360 }
361
362 let mut path = build_rel_path(&self.core.root, &version_object.key);
363 if path.is_empty() {
364 path = "/".to_owned();
365 }
366
367 let mut meta = Metadata::new(EntryMode::from_path(&path));
368 meta.set_version(&version_object.version_id);
369 meta.set_is_current(version_object.is_latest);
370 meta.set_content_length(version_object.size);
371 meta.set_last_modified(parse_datetime_from_rfc3339(
372 version_object.last_modified.as_str(),
373 )?);
374 if let Some(etag) = version_object.etag {
375 meta.set_etag(&etag);
376 meta.set_content_md5(etag.trim_matches('"'));
377 }
378
379 let entry = oio::Entry::new(&path, meta);
380 ctx.entries.push_back(entry);
381 }
382
383 if self.args.deleted() {
384 for delete_marker in output.delete_marker {
385 let mut path = build_rel_path(&self.core.root, &delete_marker.key);
386 if path.is_empty() {
387 path = "/".to_owned();
388 }
389
390 let mut meta = Metadata::new(EntryMode::FILE);
391 meta.set_version(&delete_marker.version_id);
392 meta.set_is_deleted(true);
393 meta.set_is_current(delete_marker.is_latest);
394 meta.set_last_modified(parse_datetime_from_rfc3339(
395 delete_marker.last_modified.as_str(),
396 )?);
397
398 let entry = oio::Entry::new(&path, meta);
399 ctx.entries.push_back(entry);
400 }
401 }
402
403 Ok(())
404 }
405}