opendal/services/oss/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use quick_xml::de;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::PageContext;
26use crate::raw::*;
27use crate::*;
28
29pub type OssListers = TwoWays<oio::PageLister<OssLister>, oio::PageLister<OssObjectVersionsLister>>;
30
31pub struct OssLister {
32    core: Arc<OssCore>,
33
34    path: String,
35    delimiter: &'static str,
36    limit: Option<usize>,
37    /// Filter results to objects whose names are lexicographically
38    /// **equal to or after** startOffset
39    start_after: Option<String>,
40}
41
42impl OssLister {
43    pub fn new(
44        core: Arc<OssCore>,
45        path: &str,
46        recursive: bool,
47        limit: Option<usize>,
48        start_after: Option<&str>,
49    ) -> Self {
50        let delimiter = if recursive { "" } else { "/" };
51        Self {
52            core,
53            path: path.to_string(),
54            delimiter,
55            limit,
56            start_after: start_after.map(String::from),
57        }
58    }
59}
60
61impl oio::PageList for OssLister {
62    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
63        let resp = self
64            .core
65            .oss_list_object(
66                &self.path,
67                &ctx.token,
68                self.delimiter,
69                self.limit,
70                if ctx.token.is_empty() {
71                    self.start_after.clone()
72                } else {
73                    None
74                },
75            )
76            .await?;
77
78        if resp.status() != http::StatusCode::OK {
79            return Err(parse_error(resp));
80        }
81
82        let bs = resp.into_body();
83
84        let output: ListObjectsOutput = de::from_reader(bs.reader())
85            .map_err(|e| Error::new(ErrorKind::Unexpected, "deserialize xml").set_source(e))?;
86
87        ctx.done = !output.is_truncated;
88        ctx.token = output.next_continuation_token.unwrap_or_default();
89
90        for prefix in output.common_prefixes {
91            let de = oio::Entry::new(
92                &build_rel_path(&self.core.root, &prefix.prefix),
93                Metadata::new(EntryMode::DIR),
94            );
95            ctx.entries.push_back(de);
96        }
97
98        for object in output.contents {
99            let mut path = build_rel_path(&self.core.root, &object.key);
100            if path.is_empty() {
101                path = "/".to_string();
102            }
103            if self.start_after.as_ref() == Some(&path) {
104                continue;
105            }
106
107            let mut meta = Metadata::new(EntryMode::from_path(&path));
108            meta.set_is_current(true);
109            meta.set_etag(&object.etag);
110            meta.set_content_md5(object.etag.trim_matches('"'));
111            meta.set_content_length(object.size);
112            meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
113
114            let de = oio::Entry::with(path, meta);
115            ctx.entries.push_back(de);
116        }
117
118        Ok(())
119    }
120}
121
122/// refer: https://help.aliyun.com/zh/oss/developer-reference/listobjectversions?spm=a2c4g.11186623.help-menu-31815.d_3_1_1_5_5_2.53f67237GJlMPw&scm=20140722.H_112467._.OR_help-T_cn~zh-V_1
123pub struct OssObjectVersionsLister {
124    core: Arc<OssCore>,
125
126    prefix: String,
127    args: OpList,
128
129    delimiter: &'static str,
130    abs_start_after: Option<String>,
131}
132
133impl OssObjectVersionsLister {
134    pub fn new(core: Arc<OssCore>, path: &str, args: OpList) -> Self {
135        let delimiter = if args.recursive() { "" } else { "/" };
136        let abs_start_after = args
137            .start_after()
138            .map(|start_after| build_abs_path(&core.root, start_after));
139
140        Self {
141            core,
142            prefix: path.to_string(),
143            args,
144            delimiter,
145            abs_start_after,
146        }
147    }
148}
149
150impl oio::PageList for OssObjectVersionsLister {
151    async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
152        let markers = ctx.token.rsplit_once(" ");
153        let (key_marker, version_id_marker) = if let Some(data) = markers {
154            data
155        } else if let Some(start_after) = &self.abs_start_after {
156            (start_after.as_str(), "")
157        } else {
158            ("", "")
159        };
160
161        let resp = self
162            .core
163            .oss_list_object_versions(
164                &self.prefix,
165                self.delimiter,
166                self.args.limit(),
167                key_marker,
168                version_id_marker,
169            )
170            .await?;
171        if resp.status() != http::StatusCode::OK {
172            return Err(parse_error(resp));
173        }
174
175        let body = resp.into_body();
176        let output: ListObjectVersionsOutput = de::from_reader(body.reader())
177            .map_err(new_xml_deserialize_error)
178            // Allow Cos list to retry on XML deserialization errors.
179            //
180            // This is because the Cos list API may return incomplete XML data under high load.
181            // We are confident that our XML decoding logic is correct. When this error occurs,
182            // we allow retries to obtain the correct data.
183            .map_err(Error::set_temporary)?;
184
185        ctx.done = if let Some(is_truncated) = output.is_truncated {
186            !is_truncated
187        } else {
188            false
189        };
190        ctx.token = format!(
191            "{} {}",
192            output.next_key_marker.unwrap_or_default(),
193            output.next_version_id_marker.unwrap_or_default()
194        );
195
196        for prefix in output.common_prefixes {
197            let de = oio::Entry::new(
198                &build_rel_path(&self.core.root, &prefix.prefix),
199                Metadata::new(EntryMode::DIR),
200            );
201            ctx.entries.push_back(de);
202        }
203
204        for version_object in output.version {
205            // `list` must be additive, so we need to include the latest version object
206            // even if `versions` is not enabled.
207            //
208            // Here we skip all non-latest version objects if `versions` is not enabled.
209            if !(self.args.versions() || version_object.is_latest) {
210                continue;
211            }
212
213            let mut path = build_rel_path(&self.core.root, &version_object.key);
214            if path.is_empty() {
215                path = "/".to_owned();
216            }
217
218            let mut meta = Metadata::new(EntryMode::from_path(&path));
219            meta.set_version(&version_object.version_id);
220            meta.set_is_current(version_object.is_latest);
221            meta.set_content_length(version_object.size);
222            meta.set_last_modified(parse_datetime_from_rfc3339(
223                version_object.last_modified.as_str(),
224            )?);
225            if let Some(etag) = version_object.etag {
226                meta.set_etag(&etag);
227                meta.set_content_md5(etag.trim_matches('"'));
228            }
229
230            let entry = oio::Entry::new(&path, meta);
231            ctx.entries.push_back(entry);
232        }
233
234        if self.args.deleted() {
235            for delete_marker in output.delete_marker {
236                let mut path = build_rel_path(&self.core.root, &delete_marker.key);
237                if path.is_empty() {
238                    path = "/".to_owned();
239                }
240
241                let mut meta = Metadata::new(EntryMode::FILE);
242                meta.set_version(&delete_marker.version_id);
243                meta.set_is_deleted(true);
244                meta.set_is_current(delete_marker.is_latest);
245                meta.set_last_modified(parse_datetime_from_rfc3339(
246                    delete_marker.last_modified.as_str(),
247                )?);
248
249                let entry = oio::Entry::new(&path, meta);
250                ctx.entries.push_back(entry);
251            }
252        }
253
254        Ok(())
255    }
256}