opendal/services/s3/
lister.rs1use std::sync::Arc;
19
20use super::core::*;
21use super::error::parse_error;
22use crate::raw::oio::PageContext;
23use crate::raw::*;
24use crate::EntryMode;
25use crate::Error;
26use crate::Metadata;
27use crate::Result;
28use bytes::Buf;
29use quick_xml::de;
30
31pub type S3Listers = ThreeWays<
32 oio::PageLister<S3ListerV1>,
33 oio::PageLister<S3ListerV2>,
34 oio::PageLister<S3ObjectVersionsLister>,
35>;
36
37pub struct S3ListerV1 {
39 core: Arc<S3Core>,
40
41 path: String,
42 args: OpList,
43
44 delimiter: &'static str,
45 first_marker: String,
49}
50
51impl S3ListerV1 {
52 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
53 let delimiter = if args.recursive() { "" } else { "/" };
54 let first_marker = args
55 .start_after()
56 .map(|start_after| build_abs_path(&core.root, start_after))
57 .unwrap_or_default();
58
59 Self {
60 core,
61
62 path: path.to_string(),
63 args,
64 delimiter,
65 first_marker,
66 }
67 }
68}
69
70impl oio::PageList for S3ListerV1 {
71 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
72 let resp = self
73 .core
74 .s3_list_objects_v1(
75 &self.path,
76 if !ctx.token.is_empty() {
78 &ctx.token
79 } else {
80 &self.first_marker
81 },
82 self.delimiter,
83 self.args.limit(),
84 )
85 .await?;
86
87 if resp.status() != http::StatusCode::OK {
88 return Err(parse_error(resp));
89 }
90 let bs = resp.into_body();
91
92 let output: ListObjectsOutputV1 = de::from_reader(bs.reader())
93 .map_err(new_xml_deserialize_error)
94 .map_err(Error::set_temporary)?;
100
101 ctx.done = if let Some(is_truncated) = output.is_truncated {
106 !is_truncated
107 } else {
108 output.common_prefixes.is_empty() && output.contents.is_empty()
109 };
110 ctx.token = if let Some(next_marker) = &output.next_marker {
116 next_marker.clone()
117 } else if let Some(content) = output.contents.last() {
118 content.key.clone()
119 } else if let Some(prefix) = output.common_prefixes.last() {
120 prefix.prefix.clone()
121 } else {
122 "".to_string()
123 };
124
125 for prefix in output.common_prefixes {
126 let de = oio::Entry::new(
127 &build_rel_path(&self.core.root, &prefix.prefix),
128 Metadata::new(EntryMode::DIR),
129 );
130
131 ctx.entries.push_back(de);
132 }
133
134 for object in output.contents {
135 let mut path = build_rel_path(&self.core.root, &object.key);
136 if path.is_empty() {
137 path = "/".to_string();
138 }
139
140 let mut meta = Metadata::new(EntryMode::from_path(&path));
141 meta.set_is_current(true);
142 if let Some(etag) = &object.etag {
143 meta.set_etag(etag);
144 meta.set_content_md5(etag.trim_matches('"'));
145 }
146 meta.set_content_length(object.size);
147
148 meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
151
152 let de = oio::Entry::with(path, meta);
153 ctx.entries.push_back(de);
154 }
155
156 Ok(())
157 }
158}
159
160pub struct S3ListerV2 {
162 core: Arc<S3Core>,
163
164 path: String,
165 args: OpList,
166
167 delimiter: &'static str,
168 abs_start_after: Option<String>,
169}
170
171impl S3ListerV2 {
172 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
173 let delimiter = if args.recursive() { "" } else { "/" };
174 let abs_start_after = args
175 .start_after()
176 .map(|start_after| build_abs_path(&core.root, start_after));
177
178 Self {
179 core,
180
181 path: path.to_string(),
182 args,
183 delimiter,
184 abs_start_after,
185 }
186 }
187}
188
189impl oio::PageList for S3ListerV2 {
190 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
191 let resp = self
192 .core
193 .s3_list_objects_v2(
194 &self.path,
195 &ctx.token,
196 self.delimiter,
197 self.args.limit(),
198 if ctx.token.is_empty() {
200 self.abs_start_after.clone()
201 } else {
202 None
203 },
204 )
205 .await?;
206
207 if resp.status() != http::StatusCode::OK {
208 return Err(parse_error(resp));
209 }
210 let bs = resp.into_body();
211
212 let output: ListObjectsOutputV2 = de::from_reader(bs.reader())
213 .map_err(new_xml_deserialize_error)
214 .map_err(Error::set_temporary)?;
220
221 ctx.done = if let Some(is_truncated) = output.is_truncated {
227 !is_truncated
228 } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
229 next_continuation_token.is_empty()
230 } else {
231 output.common_prefixes.is_empty() && output.contents.is_empty()
232 };
233 ctx.token = output.next_continuation_token.clone().unwrap_or_default();
234
235 for prefix in output.common_prefixes {
236 let de = oio::Entry::new(
237 &build_rel_path(&self.core.root, &prefix.prefix),
238 Metadata::new(EntryMode::DIR),
239 );
240
241 ctx.entries.push_back(de);
242 }
243
244 for object in output.contents {
245 let mut path = build_rel_path(&self.core.root, &object.key);
246 if path.is_empty() {
247 path = "/".to_string();
248 }
249
250 let mut meta = Metadata::new(EntryMode::from_path(&path));
251 meta.set_is_current(true);
252 if let Some(etag) = &object.etag {
253 meta.set_etag(etag);
254 meta.set_content_md5(etag.trim_matches('"'));
255 }
256 meta.set_content_length(object.size);
257
258 meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
261
262 let de = oio::Entry::with(path, meta);
263 ctx.entries.push_back(de);
264 }
265
266 Ok(())
267 }
268}
269
270pub struct S3ObjectVersionsLister {
272 core: Arc<S3Core>,
273
274 prefix: String,
275 args: OpList,
276
277 delimiter: &'static str,
278 abs_start_after: Option<String>,
279}
280
281impl S3ObjectVersionsLister {
282 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
283 let delimiter = if args.recursive() { "" } else { "/" };
284 let abs_start_after = args
285 .start_after()
286 .map(|start_after| build_abs_path(&core.root, start_after));
287
288 Self {
289 core,
290 prefix: path.to_string(),
291 args,
292 delimiter,
293 abs_start_after,
294 }
295 }
296}
297
298impl oio::PageList for S3ObjectVersionsLister {
299 async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
300 let markers = ctx.token.rsplit_once(" ");
301 let (key_marker, version_id_marker) = if let Some(data) = markers {
302 data
303 } else if let Some(start_after) = &self.abs_start_after {
304 (start_after.as_str(), "")
305 } else {
306 ("", "")
307 };
308
309 let resp = self
310 .core
311 .s3_list_object_versions(
312 &self.prefix,
313 self.delimiter,
314 self.args.limit(),
315 key_marker,
316 version_id_marker,
317 )
318 .await?;
319 if resp.status() != http::StatusCode::OK {
320 return Err(parse_error(resp));
321 }
322
323 let body = resp.into_body();
324 let output: ListObjectVersionsOutput = de::from_reader(body.reader())
325 .map_err(new_xml_deserialize_error)
326 .map_err(Error::set_temporary)?;
332
333 ctx.done = if let Some(is_truncated) = output.is_truncated {
334 !is_truncated
335 } else {
336 false
337 };
338 ctx.token = format!(
339 "{} {}",
340 output.next_key_marker.unwrap_or_default(),
341 output.next_version_id_marker.unwrap_or_default()
342 );
343
344 for prefix in output.common_prefixes {
345 let de = oio::Entry::new(
346 &build_rel_path(&self.core.root, &prefix.prefix),
347 Metadata::new(EntryMode::DIR),
348 );
349 ctx.entries.push_back(de);
350 }
351
352 for version_object in output.version {
353 if !(self.args.versions() || version_object.is_latest) {
358 continue;
359 }
360
361 let mut path = build_rel_path(&self.core.root, &version_object.key);
362 if path.is_empty() {
363 path = "/".to_owned();
364 }
365
366 let mut meta = Metadata::new(EntryMode::from_path(&path));
367 meta.set_version(&version_object.version_id);
368 meta.set_is_current(version_object.is_latest);
369 meta.set_content_length(version_object.size);
370 meta.set_last_modified(parse_datetime_from_rfc3339(
371 version_object.last_modified.as_str(),
372 )?);
373 if let Some(etag) = version_object.etag {
374 meta.set_etag(&etag);
375 meta.set_content_md5(etag.trim_matches('"'));
376 }
377
378 let entry = oio::Entry::new(&path, meta);
379 ctx.entries.push_back(entry);
380 }
381
382 if self.args.deleted() {
383 for delete_marker in output.delete_marker {
384 let mut path = build_rel_path(&self.core.root, &delete_marker.key);
385 if path.is_empty() {
386 path = "/".to_owned();
387 }
388
389 let mut meta = Metadata::new(EntryMode::FILE);
390 meta.set_version(&delete_marker.version_id);
391 meta.set_is_deleted(true);
392 meta.set_is_current(delete_marker.is_latest);
393 meta.set_last_modified(parse_datetime_from_rfc3339(
394 delete_marker.last_modified.as_str(),
395 )?);
396
397 let entry = oio::Entry::new(&path, meta);
398 ctx.entries.push_back(entry);
399 }
400 }
401
402 Ok(())
403 }
404}