opendal/services/cos/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use quick_xml::de;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::PageContext;
26use crate::raw::*;
27use crate::EntryMode;
28use crate::Error;
29use crate::Metadata;
30use crate::Result;
31
32pub type CosListers = TwoWays<oio::PageLister<CosLister>, oio::PageLister<CosObjectVersionsLister>>;
33
34pub struct CosLister {
35    core: Arc<CosCore>,
36    path: String,
37    delimiter: &'static str,
38    limit: Option<usize>,
39}
40
41impl CosLister {
42    pub fn new(core: Arc<CosCore>, path: &str, recursive: bool, limit: Option<usize>) -> Self {
43        let delimiter = if recursive { "" } else { "/" };
44        Self {
45            core,
46            path: path.to_string(),
47            delimiter,
48            limit,
49        }
50    }
51}
52
53impl oio::PageList for CosLister {
54    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
55        let resp = self
56            .core
57            .cos_list_objects(&self.path, &ctx.token, self.delimiter, self.limit)
58            .await?;
59
60        if resp.status() != http::StatusCode::OK {
61            return Err(parse_error(resp));
62        }
63
64        let bs = resp.into_body();
65
66        let output: ListObjectsOutput =
67            de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
68
69        // Try our best to check whether this list is done.
70        //
71        // - Check `next_marker`
72        ctx.done = match output.next_marker.as_ref() {
73            None => true,
74            Some(next_marker) => next_marker.is_empty(),
75        };
76        ctx.token = output.next_marker.clone().unwrap_or_default();
77
78        for prefix in output.common_prefixes {
79            let de = oio::Entry::new(
80                &build_rel_path(&self.core.root, &prefix.prefix),
81                Metadata::new(EntryMode::DIR),
82            );
83
84            ctx.entries.push_back(de);
85        }
86
87        for object in output.contents {
88            let mut path = build_rel_path(&self.core.root, &object.key);
89            if path.is_empty() {
90                path = "/".to_string();
91            }
92
93            let meta = Metadata::new(EntryMode::from_path(&path)).with_content_length(object.size);
94
95            let de = oio::Entry::with(path, meta);
96            ctx.entries.push_back(de);
97        }
98
99        Ok(())
100    }
101}
102
103/// refer: https://cloud.tencent.com/document/product/436/35521
104pub struct CosObjectVersionsLister {
105    core: Arc<CosCore>,
106
107    prefix: String,
108    args: OpList,
109
110    delimiter: &'static str,
111    abs_start_after: Option<String>,
112}
113
114impl CosObjectVersionsLister {
115    pub fn new(core: Arc<CosCore>, path: &str, args: OpList) -> Self {
116        let delimiter = if args.recursive() { "" } else { "/" };
117        let abs_start_after = args
118            .start_after()
119            .map(|start_after| build_abs_path(&core.root, start_after));
120
121        Self {
122            core,
123            prefix: path.to_string(),
124            args,
125            delimiter,
126            abs_start_after,
127        }
128    }
129}
130
131impl oio::PageList for CosObjectVersionsLister {
132    async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
133        let markers = ctx.token.rsplit_once(" ");
134        let (key_marker, version_id_marker) = if let Some(data) = markers {
135            data
136        } else if let Some(start_after) = &self.abs_start_after {
137            (start_after.as_str(), "")
138        } else {
139            ("", "")
140        };
141
142        let resp = self
143            .core
144            .cos_list_object_versions(
145                &self.prefix,
146                self.delimiter,
147                self.args.limit(),
148                key_marker,
149                version_id_marker,
150            )
151            .await?;
152        if resp.status() != http::StatusCode::OK {
153            return Err(parse_error(resp));
154        }
155
156        let body = resp.into_body();
157        let output: ListObjectVersionsOutput = de::from_reader(body.reader())
158            .map_err(new_xml_deserialize_error)
159            // Allow Cos list to retry on XML deserialization errors.
160            //
161            // This is because the Cos list API may return incomplete XML data under high load.
162            // We are confident that our XML decoding logic is correct. When this error occurs,
163            // we allow retries to obtain the correct data.
164            .map_err(Error::set_temporary)?;
165
166        ctx.done = if let Some(is_truncated) = output.is_truncated {
167            !is_truncated
168        } else {
169            false
170        };
171        ctx.token = format!(
172            "{} {}",
173            output.next_key_marker.unwrap_or_default(),
174            output.next_version_id_marker.unwrap_or_default()
175        );
176
177        for prefix in output.common_prefixes {
178            let de = oio::Entry::new(
179                &build_rel_path(&self.core.root, &prefix.prefix),
180                Metadata::new(EntryMode::DIR),
181            );
182            ctx.entries.push_back(de);
183        }
184
185        for version_object in output.version {
186            // `list` must be additive, so we need to include the latest version object
187            // even if `versions` is not enabled.
188            //
189            // Here we skip all non-latest version objects if `versions` is not enabled.
190            if !(self.args.versions() || version_object.is_latest) {
191                continue;
192            }
193
194            let mut path = build_rel_path(&self.core.root, &version_object.key);
195            if path.is_empty() {
196                path = "/".to_owned();
197            }
198
199            let mut meta = Metadata::new(EntryMode::from_path(&path));
200            meta.set_version(&version_object.version_id);
201            meta.set_is_current(version_object.is_latest);
202            meta.set_content_length(version_object.size);
203            meta.set_last_modified(parse_datetime_from_rfc3339(
204                version_object.last_modified.as_str(),
205            )?);
206            if let Some(etag) = version_object.etag {
207                meta.set_etag(&etag);
208                meta.set_content_md5(etag.trim_matches('"'));
209            }
210
211            let entry = oio::Entry::new(&path, meta);
212            ctx.entries.push_back(entry);
213        }
214
215        if self.args.deleted() {
216            for delete_marker in output.delete_marker {
217                let mut path = build_rel_path(&self.core.root, &delete_marker.key);
218                if path.is_empty() {
219                    path = "/".to_owned();
220                }
221
222                let mut meta = Metadata::new(EntryMode::FILE);
223                meta.set_version(&delete_marker.version_id);
224                meta.set_is_deleted(true);
225                meta.set_is_current(delete_marker.is_latest);
226                meta.set_last_modified(parse_datetime_from_rfc3339(
227                    delete_marker.last_modified.as_str(),
228                )?);
229
230                let entry = oio::Entry::new(&path, meta);
231                ctx.entries.push_back(entry);
232            }
233        }
234
235        Ok(())
236    }
237}