opendal/services/s3/
lister.rs1use std::sync::Arc;
19
20use super::core::S3Core;
21use super::core::{ListObjectVersionsOutput, ListObjectsOutput};
22use super::error::parse_error;
23use crate::raw::oio::PageContext;
24use crate::raw::*;
25use crate::EntryMode;
26use crate::Error;
27use crate::Metadata;
28use crate::Result;
29use bytes::Buf;
30use quick_xml::de;
31
32pub type S3Listers = TwoWays<oio::PageLister<S3Lister>, oio::PageLister<S3ObjectVersionsLister>>;
33
34pub struct S3Lister {
35 core: Arc<S3Core>,
36
37 path: String,
38 args: OpList,
39
40 delimiter: &'static str,
41 abs_start_after: Option<String>,
42}
43
44impl S3Lister {
45 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
46 let delimiter = if args.recursive() { "" } else { "/" };
47 let abs_start_after = args
48 .start_after()
49 .map(|start_after| build_abs_path(&core.root, start_after));
50
51 Self {
52 core,
53
54 path: path.to_string(),
55 args,
56 delimiter,
57 abs_start_after,
58 }
59 }
60}
61
62impl oio::PageList for S3Lister {
63 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
64 let resp = self
65 .core
66 .s3_list_objects(
67 &self.path,
68 &ctx.token,
69 self.delimiter,
70 self.args.limit(),
71 if ctx.token.is_empty() {
73 self.abs_start_after.clone()
74 } else {
75 None
76 },
77 )
78 .await?;
79
80 if resp.status() != http::StatusCode::OK {
81 return Err(parse_error(resp));
82 }
83 let bs = resp.into_body();
84
85 let output: ListObjectsOutput = de::from_reader(bs.reader())
86 .map_err(new_xml_deserialize_error)
87 .map_err(Error::set_temporary)?;
93
94 ctx.done = if let Some(is_truncated) = output.is_truncated {
100 !is_truncated
101 } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
102 next_continuation_token.is_empty()
103 } else {
104 output.common_prefixes.is_empty() && output.contents.is_empty()
105 };
106 ctx.token = output.next_continuation_token.clone().unwrap_or_default();
107
108 for prefix in output.common_prefixes {
109 let de = oio::Entry::new(
110 &build_rel_path(&self.core.root, &prefix.prefix),
111 Metadata::new(EntryMode::DIR),
112 );
113
114 ctx.entries.push_back(de);
115 }
116
117 for object in output.contents {
118 let mut path = build_rel_path(&self.core.root, &object.key);
119 if path.is_empty() {
120 path = "/".to_string();
121 }
122
123 let mut meta = Metadata::new(EntryMode::from_path(&path));
124 meta.set_is_current(true);
125 if let Some(etag) = &object.etag {
126 meta.set_etag(etag);
127 meta.set_content_md5(etag.trim_matches('"'));
128 }
129 meta.set_content_length(object.size);
130
131 meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
134
135 let de = oio::Entry::with(path, meta);
136 ctx.entries.push_back(de);
137 }
138
139 Ok(())
140 }
141}
142
143pub struct S3ObjectVersionsLister {
145 core: Arc<S3Core>,
146
147 prefix: String,
148 args: OpList,
149
150 delimiter: &'static str,
151 abs_start_after: Option<String>,
152}
153
154impl S3ObjectVersionsLister {
155 pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
156 let delimiter = if args.recursive() { "" } else { "/" };
157 let abs_start_after = args
158 .start_after()
159 .map(|start_after| build_abs_path(&core.root, start_after));
160
161 Self {
162 core,
163 prefix: path.to_string(),
164 args,
165 delimiter,
166 abs_start_after,
167 }
168 }
169}
170
171impl oio::PageList for S3ObjectVersionsLister {
172 async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
173 let markers = ctx.token.rsplit_once(" ");
174 let (key_marker, version_id_marker) = if let Some(data) = markers {
175 data
176 } else if let Some(start_after) = &self.abs_start_after {
177 (start_after.as_str(), "")
178 } else {
179 ("", "")
180 };
181
182 let resp = self
183 .core
184 .s3_list_object_versions(
185 &self.prefix,
186 self.delimiter,
187 self.args.limit(),
188 key_marker,
189 version_id_marker,
190 )
191 .await?;
192 if resp.status() != http::StatusCode::OK {
193 return Err(parse_error(resp));
194 }
195
196 let body = resp.into_body();
197 let output: ListObjectVersionsOutput = de::from_reader(body.reader())
198 .map_err(new_xml_deserialize_error)
199 .map_err(Error::set_temporary)?;
205
206 ctx.done = if let Some(is_truncated) = output.is_truncated {
207 !is_truncated
208 } else {
209 false
210 };
211 ctx.token = format!(
212 "{} {}",
213 output.next_key_marker.unwrap_or_default(),
214 output.next_version_id_marker.unwrap_or_default()
215 );
216
217 for prefix in output.common_prefixes {
218 let de = oio::Entry::new(
219 &build_rel_path(&self.core.root, &prefix.prefix),
220 Metadata::new(EntryMode::DIR),
221 );
222 ctx.entries.push_back(de);
223 }
224
225 for version_object in output.version {
226 if !(self.args.versions() || version_object.is_latest) {
231 continue;
232 }
233
234 let mut path = build_rel_path(&self.core.root, &version_object.key);
235 if path.is_empty() {
236 path = "/".to_owned();
237 }
238
239 let mut meta = Metadata::new(EntryMode::from_path(&path));
240 meta.set_version(&version_object.version_id);
241 meta.set_is_current(version_object.is_latest);
242 meta.set_content_length(version_object.size);
243 meta.set_last_modified(parse_datetime_from_rfc3339(
244 version_object.last_modified.as_str(),
245 )?);
246 if let Some(etag) = version_object.etag {
247 meta.set_etag(&etag);
248 meta.set_content_md5(etag.trim_matches('"'));
249 }
250
251 let entry = oio::Entry::new(&path, meta);
252 ctx.entries.push_back(entry);
253 }
254
255 if self.args.deleted() {
256 for delete_marker in output.delete_marker {
257 let mut path = build_rel_path(&self.core.root, &delete_marker.key);
258 if path.is_empty() {
259 path = "/".to_owned();
260 }
261
262 let mut meta = Metadata::new(EntryMode::FILE);
263 meta.set_version(&delete_marker.version_id);
264 meta.set_is_deleted(true);
265 meta.set_is_current(delete_marker.is_latest);
266 meta.set_last_modified(parse_datetime_from_rfc3339(
267 delete_marker.last_modified.as_str(),
268 )?);
269
270 let entry = oio::Entry::new(&path, meta);
271 ctx.entries.push_back(entry);
272 }
273 }
274
275 Ok(())
276 }
277}