opendal/services/s3/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::core::S3Core;
21use super::core::{ListObjectVersionsOutput, ListObjectsOutput};
22use super::error::parse_error;
23use crate::raw::oio::PageContext;
24use crate::raw::*;
25use crate::EntryMode;
26use crate::Error;
27use crate::Metadata;
28use crate::Result;
29use bytes::Buf;
30use quick_xml::de;
31
32pub type S3Listers = TwoWays<oio::PageLister<S3Lister>, oio::PageLister<S3ObjectVersionsLister>>;
33
34pub struct S3Lister {
35    core: Arc<S3Core>,
36
37    path: String,
38    args: OpList,
39
40    delimiter: &'static str,
41    abs_start_after: Option<String>,
42}
43
44impl S3Lister {
45    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
46        let delimiter = if args.recursive() { "" } else { "/" };
47        let abs_start_after = args
48            .start_after()
49            .map(|start_after| build_abs_path(&core.root, start_after));
50
51        Self {
52            core,
53
54            path: path.to_string(),
55            args,
56            delimiter,
57            abs_start_after,
58        }
59    }
60}
61
62impl oio::PageList for S3Lister {
63    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
64        let resp = self
65            .core
66            .s3_list_objects(
67                &self.path,
68                &ctx.token,
69                self.delimiter,
70                self.args.limit(),
71                // start after should only be set for the first page.
72                if ctx.token.is_empty() {
73                    self.abs_start_after.clone()
74                } else {
75                    None
76                },
77            )
78            .await?;
79
80        if resp.status() != http::StatusCode::OK {
81            return Err(parse_error(resp));
82        }
83        let bs = resp.into_body();
84
85        let output: ListObjectsOutput = de::from_reader(bs.reader())
86            .map_err(new_xml_deserialize_error)
87            // Allow S3 list to retry on XML deserialization errors.
88            //
89            // This is because the S3 list API may return incomplete XML data under high load.
90            // We are confident that our XML decoding logic is correct. When this error occurs,
91            // we allow retries to obtain the correct data.
92            .map_err(Error::set_temporary)?;
93
94        // Try our best to check whether this list is done.
95        //
96        // - Check `is_truncated`
97        // - Check `next_continuation_token`
98        // - Check the length of `common_prefixes` and `contents` (very rare case)
99        ctx.done = if let Some(is_truncated) = output.is_truncated {
100            !is_truncated
101        } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
102            next_continuation_token.is_empty()
103        } else {
104            output.common_prefixes.is_empty() && output.contents.is_empty()
105        };
106        ctx.token = output.next_continuation_token.clone().unwrap_or_default();
107
108        for prefix in output.common_prefixes {
109            let de = oio::Entry::new(
110                &build_rel_path(&self.core.root, &prefix.prefix),
111                Metadata::new(EntryMode::DIR),
112            );
113
114            ctx.entries.push_back(de);
115        }
116
117        for object in output.contents {
118            let mut path = build_rel_path(&self.core.root, &object.key);
119            if path.is_empty() {
120                path = "/".to_string();
121            }
122
123            let mut meta = Metadata::new(EntryMode::from_path(&path));
124            meta.set_is_current(true);
125            if let Some(etag) = &object.etag {
126                meta.set_etag(etag);
127                meta.set_content_md5(etag.trim_matches('"'));
128            }
129            meta.set_content_length(object.size);
130
131            // object.last_modified provides more precise time that contains
132            // nanosecond, let's trim them.
133            meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
134
135            let de = oio::Entry::with(path, meta);
136            ctx.entries.push_back(de);
137        }
138
139        Ok(())
140    }
141}
142
143/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
144pub struct S3ObjectVersionsLister {
145    core: Arc<S3Core>,
146
147    prefix: String,
148    args: OpList,
149
150    delimiter: &'static str,
151    abs_start_after: Option<String>,
152}
153
154impl S3ObjectVersionsLister {
155    pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
156        let delimiter = if args.recursive() { "" } else { "/" };
157        let abs_start_after = args
158            .start_after()
159            .map(|start_after| build_abs_path(&core.root, start_after));
160
161        Self {
162            core,
163            prefix: path.to_string(),
164            args,
165            delimiter,
166            abs_start_after,
167        }
168    }
169}
170
171impl oio::PageList for S3ObjectVersionsLister {
172    async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
173        let markers = ctx.token.rsplit_once(" ");
174        let (key_marker, version_id_marker) = if let Some(data) = markers {
175            data
176        } else if let Some(start_after) = &self.abs_start_after {
177            (start_after.as_str(), "")
178        } else {
179            ("", "")
180        };
181
182        let resp = self
183            .core
184            .s3_list_object_versions(
185                &self.prefix,
186                self.delimiter,
187                self.args.limit(),
188                key_marker,
189                version_id_marker,
190            )
191            .await?;
192        if resp.status() != http::StatusCode::OK {
193            return Err(parse_error(resp));
194        }
195
196        let body = resp.into_body();
197        let output: ListObjectVersionsOutput = de::from_reader(body.reader())
198            .map_err(new_xml_deserialize_error)
199            // Allow S3 list to retry on XML deserialization errors.
200            //
201            // This is because the S3 list API may return incomplete XML data under high load.
202            // We are confident that our XML decoding logic is correct. When this error occurs,
203            // we allow retries to obtain the correct data.
204            .map_err(Error::set_temporary)?;
205
206        ctx.done = if let Some(is_truncated) = output.is_truncated {
207            !is_truncated
208        } else {
209            false
210        };
211        ctx.token = format!(
212            "{} {}",
213            output.next_key_marker.unwrap_or_default(),
214            output.next_version_id_marker.unwrap_or_default()
215        );
216
217        for prefix in output.common_prefixes {
218            let de = oio::Entry::new(
219                &build_rel_path(&self.core.root, &prefix.prefix),
220                Metadata::new(EntryMode::DIR),
221            );
222            ctx.entries.push_back(de);
223        }
224
225        for version_object in output.version {
226            // `list` must be additive, so we need to include the latest version object
227            // even if `versions` is not enabled.
228            //
229            // Here we skip all non-latest version objects if `versions` is not enabled.
230            if !(self.args.versions() || version_object.is_latest) {
231                continue;
232            }
233
234            let mut path = build_rel_path(&self.core.root, &version_object.key);
235            if path.is_empty() {
236                path = "/".to_owned();
237            }
238
239            let mut meta = Metadata::new(EntryMode::from_path(&path));
240            meta.set_version(&version_object.version_id);
241            meta.set_is_current(version_object.is_latest);
242            meta.set_content_length(version_object.size);
243            meta.set_last_modified(parse_datetime_from_rfc3339(
244                version_object.last_modified.as_str(),
245            )?);
246            if let Some(etag) = version_object.etag {
247                meta.set_etag(&etag);
248                meta.set_content_md5(etag.trim_matches('"'));
249            }
250
251            let entry = oio::Entry::new(&path, meta);
252            ctx.entries.push_back(entry);
253        }
254
255        if self.args.deleted() {
256            for delete_marker in output.delete_marker {
257                let mut path = build_rel_path(&self.core.root, &delete_marker.key);
258                if path.is_empty() {
259                    path = "/".to_owned();
260                }
261
262                let mut meta = Metadata::new(EntryMode::FILE);
263                meta.set_version(&delete_marker.version_id);
264                meta.set_is_deleted(true);
265                meta.set_is_current(delete_marker.is_latest);
266                meta.set_last_modified(parse_datetime_from_rfc3339(
267                    delete_marker.last_modified.as_str(),
268                )?);
269
270                let entry = oio::Entry::new(&path, meta);
271                ctx.entries.push_back(entry);
272            }
273        }
274
275        Ok(())
276    }
277}