opendal/services/s3/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use quick_xml::de;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::PageContext;
26use crate::raw::*;
27use crate::EntryMode;
28use crate::Error;
29use crate::Metadata;
30use crate::Result;
31
32pub type S3Listers = ThreeWays<
33    oio::PageLister<S3ListerV1>,
34    oio::PageLister<S3ListerV2>,
35    oio::PageLister<S3ObjectVersionsLister>,
36>;
37
38/// S3ListerV1 implements ListObjectV1 for s3 backend.
39pub struct S3ListerV1 {
40    core: Arc<S3Core>,
41
42    path: String,
43    args: OpList,
44
45    delimiter: &'static str,
46    /// marker can also be used as `start-after` for list objects v1.
47    /// We will use it as `start-after` for the first page and then ignore
48    /// it in the following pages.
49    first_marker: String,
50}
51
52impl S3ListerV1 {
53    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
54        let delimiter = if args.recursive() { "" } else { "/" };
55        let first_marker = args
56            .start_after()
57            .map(|start_after| build_abs_path(&core.root, start_after))
58            .unwrap_or_default();
59
60        Self {
61            core,
62
63            path: path.to_string(),
64            args,
65            delimiter,
66            first_marker,
67        }
68    }
69}
70
71impl oio::PageList for S3ListerV1 {
72    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
73        let resp = self
74            .core
75            .s3_list_objects_v1(
76                &self.path,
77                // `marker` is used as `start-after` for the first page.
78                if !ctx.token.is_empty() {
79                    &ctx.token
80                } else {
81                    &self.first_marker
82                },
83                self.delimiter,
84                self.args.limit(),
85            )
86            .await?;
87
88        if resp.status() != http::StatusCode::OK {
89            return Err(parse_error(resp));
90        }
91        let bs = resp.into_body();
92
93        let output: ListObjectsOutputV1 = de::from_reader(bs.reader())
94            .map_err(new_xml_deserialize_error)
95            // Allow S3 list to retry on XML deserialization errors.
96            //
97            // This is because the S3 list API may return incomplete XML data under high load.
98            // We are confident that our XML decoding logic is correct. When this error occurs,
99            // we allow retries to obtain the correct data.
100            .map_err(Error::set_temporary)?;
101
102        // Try our best to check whether this list is done.
103        //
104        // - Check `is_truncated`
105        // - Check the length of `common_prefixes` and `contents` (very rare case)
106        ctx.done = if let Some(is_truncated) = output.is_truncated {
107            !is_truncated
108        } else {
109            output.common_prefixes.is_empty() && output.contents.is_empty()
110        };
111        // Try out best to find the next marker.
112        //
113        // - Check `next-marker`
114        // - Check the last object key
115        // - Check the last common prefix
116        ctx.token = if let Some(next_marker) = &output.next_marker {
117            next_marker.clone()
118        } else if let Some(content) = output.contents.last() {
119            content.key.clone()
120        } else if let Some(prefix) = output.common_prefixes.last() {
121            prefix.prefix.clone()
122        } else {
123            "".to_string()
124        };
125
126        for prefix in output.common_prefixes {
127            let de = oio::Entry::new(
128                &build_rel_path(&self.core.root, &prefix.prefix),
129                Metadata::new(EntryMode::DIR),
130            );
131
132            ctx.entries.push_back(de);
133        }
134
135        for object in output.contents {
136            let mut path = build_rel_path(&self.core.root, &object.key);
137            if path.is_empty() {
138                path = "/".to_string();
139            }
140
141            let mut meta = Metadata::new(EntryMode::from_path(&path));
142            meta.set_is_current(true);
143            if let Some(etag) = &object.etag {
144                meta.set_etag(etag);
145                meta.set_content_md5(etag.trim_matches('"'));
146            }
147            meta.set_content_length(object.size);
148
149            // object.last_modified provides more precise time that contains
150            // nanosecond, let's trim them.
151            meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
152
153            let de = oio::Entry::with(path, meta);
154            ctx.entries.push_back(de);
155        }
156
157        Ok(())
158    }
159}
160
161/// S3ListerV2 implements ListObjectV2 for s3 backend.
162pub struct S3ListerV2 {
163    core: Arc<S3Core>,
164
165    path: String,
166    args: OpList,
167
168    delimiter: &'static str,
169    abs_start_after: Option<String>,
170}
171
172impl S3ListerV2 {
173    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
174        let delimiter = if args.recursive() { "" } else { "/" };
175        let abs_start_after = args
176            .start_after()
177            .map(|start_after| build_abs_path(&core.root, start_after));
178
179        Self {
180            core,
181
182            path: path.to_string(),
183            args,
184            delimiter,
185            abs_start_after,
186        }
187    }
188}
189
190impl oio::PageList for S3ListerV2 {
191    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
192        let resp = self
193            .core
194            .s3_list_objects_v2(
195                &self.path,
196                &ctx.token,
197                self.delimiter,
198                self.args.limit(),
199                // start after should only be set for the first page.
200                if ctx.token.is_empty() {
201                    self.abs_start_after.clone()
202                } else {
203                    None
204                },
205            )
206            .await?;
207
208        if resp.status() != http::StatusCode::OK {
209            return Err(parse_error(resp));
210        }
211        let bs = resp.into_body();
212
213        let output: ListObjectsOutputV2 = de::from_reader(bs.reader())
214            .map_err(new_xml_deserialize_error)
215            // Allow S3 list to retry on XML deserialization errors.
216            //
217            // This is because the S3 list API may return incomplete XML data under high load.
218            // We are confident that our XML decoding logic is correct. When this error occurs,
219            // we allow retries to obtain the correct data.
220            .map_err(Error::set_temporary)?;
221
222        // Try our best to check whether this list is done.
223        //
224        // - Check `is_truncated`
225        // - Check `next_continuation_token`
226        // - Check the length of `common_prefixes` and `contents` (very rare case)
227        ctx.done = if let Some(is_truncated) = output.is_truncated {
228            !is_truncated
229        } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
230            next_continuation_token.is_empty()
231        } else {
232            output.common_prefixes.is_empty() && output.contents.is_empty()
233        };
234        ctx.token = output.next_continuation_token.clone().unwrap_or_default();
235
236        for prefix in output.common_prefixes {
237            let de = oio::Entry::new(
238                &build_rel_path(&self.core.root, &prefix.prefix),
239                Metadata::new(EntryMode::DIR),
240            );
241
242            ctx.entries.push_back(de);
243        }
244
245        for object in output.contents {
246            let mut path = build_rel_path(&self.core.root, &object.key);
247            if path.is_empty() {
248                path = "/".to_string();
249            }
250
251            let mut meta = Metadata::new(EntryMode::from_path(&path));
252            meta.set_is_current(true);
253            if let Some(etag) = &object.etag {
254                meta.set_etag(etag);
255                meta.set_content_md5(etag.trim_matches('"'));
256            }
257            meta.set_content_length(object.size);
258
259            // object.last_modified provides more precise time that contains
260            // nanosecond, let's trim them.
261            meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
262
263            let de = oio::Entry::with(path, meta);
264            ctx.entries.push_back(de);
265        }
266
267        Ok(())
268    }
269}
270
271/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
272pub struct S3ObjectVersionsLister {
273    core: Arc<S3Core>,
274
275    prefix: String,
276    args: OpList,
277
278    delimiter: &'static str,
279    abs_start_after: Option<String>,
280}
281
282impl S3ObjectVersionsLister {
283    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
284        let delimiter = if args.recursive() { "" } else { "/" };
285        let abs_start_after = args
286            .start_after()
287            .map(|start_after| build_abs_path(&core.root, start_after));
288
289        Self {
290            core,
291            prefix: path.to_string(),
292            args,
293            delimiter,
294            abs_start_after,
295        }
296    }
297}
298
299impl oio::PageList for S3ObjectVersionsLister {
300    async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
301        let markers = ctx.token.rsplit_once(" ");
302        let (key_marker, version_id_marker) = if let Some(data) = markers {
303            data
304        } else if let Some(start_after) = &self.abs_start_after {
305            (start_after.as_str(), "")
306        } else {
307            ("", "")
308        };
309
310        let resp = self
311            .core
312            .s3_list_object_versions(
313                &self.prefix,
314                self.delimiter,
315                self.args.limit(),
316                key_marker,
317                version_id_marker,
318            )
319            .await?;
320        if resp.status() != http::StatusCode::OK {
321            return Err(parse_error(resp));
322        }
323
324        let body = resp.into_body();
325        let output: ListObjectVersionsOutput = de::from_reader(body.reader())
326            .map_err(new_xml_deserialize_error)
327            // Allow S3 list to retry on XML deserialization errors.
328            //
329            // This is because the S3 list API may return incomplete XML data under high load.
330            // We are confident that our XML decoding logic is correct. When this error occurs,
331            // we allow retries to obtain the correct data.
332            .map_err(Error::set_temporary)?;
333
334        ctx.done = if let Some(is_truncated) = output.is_truncated {
335            !is_truncated
336        } else {
337            false
338        };
339        ctx.token = format!(
340            "{} {}",
341            output.next_key_marker.unwrap_or_default(),
342            output.next_version_id_marker.unwrap_or_default()
343        );
344
345        for prefix in output.common_prefixes {
346            let de = oio::Entry::new(
347                &build_rel_path(&self.core.root, &prefix.prefix),
348                Metadata::new(EntryMode::DIR),
349            );
350            ctx.entries.push_back(de);
351        }
352
353        for version_object in output.version {
354            // `list` must be additive, so we need to include the latest version object
355            // even if `versions` is not enabled.
356            //
357            // Here we skip all non-latest version objects if `versions` is not enabled.
358            if !(self.args.versions() || version_object.is_latest) {
359                continue;
360            }
361
362            let mut path = build_rel_path(&self.core.root, &version_object.key);
363            if path.is_empty() {
364                path = "/".to_owned();
365            }
366
367            let mut meta = Metadata::new(EntryMode::from_path(&path));
368            meta.set_version(&version_object.version_id);
369            meta.set_is_current(version_object.is_latest);
370            meta.set_content_length(version_object.size);
371            meta.set_last_modified(parse_datetime_from_rfc3339(
372                version_object.last_modified.as_str(),
373            )?);
374            if let Some(etag) = version_object.etag {
375                meta.set_etag(&etag);
376                meta.set_content_md5(etag.trim_matches('"'));
377            }
378
379            let entry = oio::Entry::new(&path, meta);
380            ctx.entries.push_back(entry);
381        }
382
383        if self.args.deleted() {
384            for delete_marker in output.delete_marker {
385                let mut path = build_rel_path(&self.core.root, &delete_marker.key);
386                if path.is_empty() {
387                    path = "/".to_owned();
388                }
389
390                let mut meta = Metadata::new(EntryMode::FILE);
391                meta.set_version(&delete_marker.version_id);
392                meta.set_is_deleted(true);
393                meta.set_is_current(delete_marker.is_latest);
394                meta.set_last_modified(parse_datetime_from_rfc3339(
395                    delete_marker.last_modified.as_str(),
396                )?);
397
398                let entry = oio::Entry::new(&path, meta);
399                ctx.entries.push_back(entry);
400            }
401        }
402
403        Ok(())
404    }
405}