opendal/services/s3/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::core::*;
21use super::error::parse_error;
22use crate::raw::oio::PageContext;
23use crate::raw::*;
24use crate::EntryMode;
25use crate::Error;
26use crate::Metadata;
27use crate::Result;
28use bytes::Buf;
29use quick_xml::de;
30
31pub type S3Listers = ThreeWays<
32    oio::PageLister<S3ListerV1>,
33    oio::PageLister<S3ListerV2>,
34    oio::PageLister<S3ObjectVersionsLister>,
35>;
36
37/// S3ListerV1 implements ListObjectV1 for s3 backend.
38pub struct S3ListerV1 {
39    core: Arc<S3Core>,
40
41    path: String,
42    args: OpList,
43
44    delimiter: &'static str,
45    /// marker can also be used as `start-after` for list objects v1.
46    /// We will use it as `start-after` for the first page and then ignore
47    /// it in the following pages.
48    first_marker: String,
49}
50
51impl S3ListerV1 {
52    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
53        let delimiter = if args.recursive() { "" } else { "/" };
54        let first_marker = args
55            .start_after()
56            .map(|start_after| build_abs_path(&core.root, start_after))
57            .unwrap_or_default();
58
59        Self {
60            core,
61
62            path: path.to_string(),
63            args,
64            delimiter,
65            first_marker,
66        }
67    }
68}
69
70impl oio::PageList for S3ListerV1 {
71    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
72        let resp = self
73            .core
74            .s3_list_objects_v1(
75                &self.path,
76                // `marker` is used as `start-after` for the first page.
77                if !ctx.token.is_empty() {
78                    &ctx.token
79                } else {
80                    &self.first_marker
81                },
82                self.delimiter,
83                self.args.limit(),
84            )
85            .await?;
86
87        if resp.status() != http::StatusCode::OK {
88            return Err(parse_error(resp));
89        }
90        let bs = resp.into_body();
91
92        let output: ListObjectsOutputV1 = de::from_reader(bs.reader())
93            .map_err(new_xml_deserialize_error)
94            // Allow S3 list to retry on XML deserialization errors.
95            //
96            // This is because the S3 list API may return incomplete XML data under high load.
97            // We are confident that our XML decoding logic is correct. When this error occurs,
98            // we allow retries to obtain the correct data.
99            .map_err(Error::set_temporary)?;
100
101        // Try our best to check whether this list is done.
102        //
103        // - Check `is_truncated`
104        // - Check the length of `common_prefixes` and `contents` (very rare case)
105        ctx.done = if let Some(is_truncated) = output.is_truncated {
106            !is_truncated
107        } else {
108            output.common_prefixes.is_empty() && output.contents.is_empty()
109        };
110        // Try out best to find the next marker.
111        //
112        // - Check `next-marker`
113        // - Check the last object key
114        // - Check the last common prefix
115        ctx.token = if let Some(next_marker) = &output.next_marker {
116            next_marker.clone()
117        } else if let Some(content) = output.contents.last() {
118            content.key.clone()
119        } else if let Some(prefix) = output.common_prefixes.last() {
120            prefix.prefix.clone()
121        } else {
122            "".to_string()
123        };
124
125        for prefix in output.common_prefixes {
126            let de = oio::Entry::new(
127                &build_rel_path(&self.core.root, &prefix.prefix),
128                Metadata::new(EntryMode::DIR),
129            );
130
131            ctx.entries.push_back(de);
132        }
133
134        for object in output.contents {
135            let mut path = build_rel_path(&self.core.root, &object.key);
136            if path.is_empty() {
137                path = "/".to_string();
138            }
139
140            let mut meta = Metadata::new(EntryMode::from_path(&path));
141            meta.set_is_current(true);
142            if let Some(etag) = &object.etag {
143                meta.set_etag(etag);
144                meta.set_content_md5(etag.trim_matches('"'));
145            }
146            meta.set_content_length(object.size);
147
148            // object.last_modified provides more precise time that contains
149            // nanosecond, let's trim them.
150            meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
151
152            let de = oio::Entry::with(path, meta);
153            ctx.entries.push_back(de);
154        }
155
156        Ok(())
157    }
158}
159
160/// S3ListerV2 implements ListObjectV2 for s3 backend.
161pub struct S3ListerV2 {
162    core: Arc<S3Core>,
163
164    path: String,
165    args: OpList,
166
167    delimiter: &'static str,
168    abs_start_after: Option<String>,
169}
170
171impl S3ListerV2 {
172    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
173        let delimiter = if args.recursive() { "" } else { "/" };
174        let abs_start_after = args
175            .start_after()
176            .map(|start_after| build_abs_path(&core.root, start_after));
177
178        Self {
179            core,
180
181            path: path.to_string(),
182            args,
183            delimiter,
184            abs_start_after,
185        }
186    }
187}
188
189impl oio::PageList for S3ListerV2 {
190    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
191        let resp = self
192            .core
193            .s3_list_objects_v2(
194                &self.path,
195                &ctx.token,
196                self.delimiter,
197                self.args.limit(),
198                // start after should only be set for the first page.
199                if ctx.token.is_empty() {
200                    self.abs_start_after.clone()
201                } else {
202                    None
203                },
204            )
205            .await?;
206
207        if resp.status() != http::StatusCode::OK {
208            return Err(parse_error(resp));
209        }
210        let bs = resp.into_body();
211
212        let output: ListObjectsOutputV2 = de::from_reader(bs.reader())
213            .map_err(new_xml_deserialize_error)
214            // Allow S3 list to retry on XML deserialization errors.
215            //
216            // This is because the S3 list API may return incomplete XML data under high load.
217            // We are confident that our XML decoding logic is correct. When this error occurs,
218            // we allow retries to obtain the correct data.
219            .map_err(Error::set_temporary)?;
220
221        // Try our best to check whether this list is done.
222        //
223        // - Check `is_truncated`
224        // - Check `next_continuation_token`
225        // - Check the length of `common_prefixes` and `contents` (very rare case)
226        ctx.done = if let Some(is_truncated) = output.is_truncated {
227            !is_truncated
228        } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
229            next_continuation_token.is_empty()
230        } else {
231            output.common_prefixes.is_empty() && output.contents.is_empty()
232        };
233        ctx.token = output.next_continuation_token.clone().unwrap_or_default();
234
235        for prefix in output.common_prefixes {
236            let de = oio::Entry::new(
237                &build_rel_path(&self.core.root, &prefix.prefix),
238                Metadata::new(EntryMode::DIR),
239            );
240
241            ctx.entries.push_back(de);
242        }
243
244        for object in output.contents {
245            let mut path = build_rel_path(&self.core.root, &object.key);
246            if path.is_empty() {
247                path = "/".to_string();
248            }
249
250            let mut meta = Metadata::new(EntryMode::from_path(&path));
251            meta.set_is_current(true);
252            if let Some(etag) = &object.etag {
253                meta.set_etag(etag);
254                meta.set_content_md5(etag.trim_matches('"'));
255            }
256            meta.set_content_length(object.size);
257
258            // object.last_modified provides more precise time that contains
259            // nanosecond, let's trim them.
260            meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
261
262            let de = oio::Entry::with(path, meta);
263            ctx.entries.push_back(de);
264        }
265
266        Ok(())
267    }
268}
269
270/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
271pub struct S3ObjectVersionsLister {
272    core: Arc<S3Core>,
273
274    prefix: String,
275    args: OpList,
276
277    delimiter: &'static str,
278    abs_start_after: Option<String>,
279}
280
281impl S3ObjectVersionsLister {
282    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
283        let delimiter = if args.recursive() { "" } else { "/" };
284        let abs_start_after = args
285            .start_after()
286            .map(|start_after| build_abs_path(&core.root, start_after));
287
288        Self {
289            core,
290            prefix: path.to_string(),
291            args,
292            delimiter,
293            abs_start_after,
294        }
295    }
296}
297
298impl oio::PageList for S3ObjectVersionsLister {
299    async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
300        let markers = ctx.token.rsplit_once(" ");
301        let (key_marker, version_id_marker) = if let Some(data) = markers {
302            data
303        } else if let Some(start_after) = &self.abs_start_after {
304            (start_after.as_str(), "")
305        } else {
306            ("", "")
307        };
308
309        let resp = self
310            .core
311            .s3_list_object_versions(
312                &self.prefix,
313                self.delimiter,
314                self.args.limit(),
315                key_marker,
316                version_id_marker,
317            )
318            .await?;
319        if resp.status() != http::StatusCode::OK {
320            return Err(parse_error(resp));
321        }
322
323        let body = resp.into_body();
324        let output: ListObjectVersionsOutput = de::from_reader(body.reader())
325            .map_err(new_xml_deserialize_error)
326            // Allow S3 list to retry on XML deserialization errors.
327            //
328            // This is because the S3 list API may return incomplete XML data under high load.
329            // We are confident that our XML decoding logic is correct. When this error occurs,
330            // we allow retries to obtain the correct data.
331            .map_err(Error::set_temporary)?;
332
333        ctx.done = if let Some(is_truncated) = output.is_truncated {
334            !is_truncated
335        } else {
336            false
337        };
338        ctx.token = format!(
339            "{} {}",
340            output.next_key_marker.unwrap_or_default(),
341            output.next_version_id_marker.unwrap_or_default()
342        );
343
344        for prefix in output.common_prefixes {
345            let de = oio::Entry::new(
346                &build_rel_path(&self.core.root, &prefix.prefix),
347                Metadata::new(EntryMode::DIR),
348            );
349            ctx.entries.push_back(de);
350        }
351
352        for version_object in output.version {
353            // `list` must be additive, so we need to include the latest version object
354            // even if `versions` is not enabled.
355            //
356            // Here we skip all non-latest version objects if `versions` is not enabled.
357            if !(self.args.versions() || version_object.is_latest) {
358                continue;
359            }
360
361            let mut path = build_rel_path(&self.core.root, &version_object.key);
362            if path.is_empty() {
363                path = "/".to_owned();
364            }
365
366            let mut meta = Metadata::new(EntryMode::from_path(&path));
367            meta.set_version(&version_object.version_id);
368            meta.set_is_current(version_object.is_latest);
369            meta.set_content_length(version_object.size);
370            meta.set_last_modified(parse_datetime_from_rfc3339(
371                version_object.last_modified.as_str(),
372            )?);
373            if let Some(etag) = version_object.etag {
374                meta.set_etag(&etag);
375                meta.set_content_md5(etag.trim_matches('"'));
376            }
377
378            let entry = oio::Entry::new(&path, meta);
379            ctx.entries.push_back(entry);
380        }
381
382        if self.args.deleted() {
383            for delete_marker in output.delete_marker {
384                let mut path = build_rel_path(&self.core.root, &delete_marker.key);
385                if path.is_empty() {
386                    path = "/".to_owned();
387                }
388
389                let mut meta = Metadata::new(EntryMode::FILE);
390                meta.set_version(&delete_marker.version_id);
391                meta.set_is_deleted(true);
392                meta.set_is_current(delete_marker.is_latest);
393                meta.set_last_modified(parse_datetime_from_rfc3339(
394                    delete_marker.last_modified.as_str(),
395                )?);
396
397                let entry = oio::Entry::new(&path, meta);
398                ctx.entries.push_back(entry);
399            }
400        }
401
402        Ok(())
403    }
404}