opendal/services/oss/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use quick_xml::de;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::PageContext;
26use crate::raw::*;
27use crate::*;
28
29pub type OssListers = TwoWays<oio::PageLister<OssLister>, oio::PageLister<OssObjectVersionsLister>>;
30
31pub struct OssLister {
32 core: Arc<OssCore>,
33
34 path: String,
35 delimiter: &'static str,
36 limit: Option<usize>,
37 start_after: Option<String>,
40}
41
42impl OssLister {
43 pub fn new(
44 core: Arc<OssCore>,
45 path: &str,
46 recursive: bool,
47 limit: Option<usize>,
48 start_after: Option<&str>,
49 ) -> Self {
50 let delimiter = if recursive { "" } else { "/" };
51 Self {
52 core,
53 path: path.to_string(),
54 delimiter,
55 limit,
56 start_after: start_after.map(String::from),
57 }
58 }
59}
60
61impl oio::PageList for OssLister {
62 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
63 let resp = self
64 .core
65 .oss_list_object(
66 &self.path,
67 &ctx.token,
68 self.delimiter,
69 self.limit,
70 if ctx.token.is_empty() {
71 self.start_after.clone()
72 } else {
73 None
74 },
75 )
76 .await?;
77
78 if resp.status() != http::StatusCode::OK {
79 return Err(parse_error(resp));
80 }
81
82 let bs = resp.into_body();
83
84 let output: ListObjectsOutput = de::from_reader(bs.reader())
85 .map_err(|e| Error::new(ErrorKind::Unexpected, "deserialize xml").set_source(e))?;
86
87 ctx.done = !output.is_truncated;
88 ctx.token = output.next_continuation_token.unwrap_or_default();
89
90 for prefix in output.common_prefixes {
91 let de = oio::Entry::new(
92 &build_rel_path(&self.core.root, &prefix.prefix),
93 Metadata::new(EntryMode::DIR),
94 );
95 ctx.entries.push_back(de);
96 }
97
98 for object in output.contents {
99 let mut path = build_rel_path(&self.core.root, &object.key);
100 if path.is_empty() {
101 path = "/".to_string();
102 }
103 if self.start_after.as_ref() == Some(&path) {
104 continue;
105 }
106
107 let mut meta = Metadata::new(EntryMode::from_path(&path));
108 meta.set_is_current(true);
109 meta.set_etag(&object.etag);
110 meta.set_content_md5(object.etag.trim_matches('"'));
111 meta.set_content_length(object.size);
112 meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);
113
114 let de = oio::Entry::with(path, meta);
115 ctx.entries.push_back(de);
116 }
117
118 Ok(())
119 }
120}
121
122pub struct OssObjectVersionsLister {
124 core: Arc<OssCore>,
125
126 prefix: String,
127 args: OpList,
128
129 delimiter: &'static str,
130 abs_start_after: Option<String>,
131}
132
133impl OssObjectVersionsLister {
134 pub fn new(core: Arc<OssCore>, path: &str, args: OpList) -> Self {
135 let delimiter = if args.recursive() { "" } else { "/" };
136 let abs_start_after = args
137 .start_after()
138 .map(|start_after| build_abs_path(&core.root, start_after));
139
140 Self {
141 core,
142 prefix: path.to_string(),
143 args,
144 delimiter,
145 abs_start_after,
146 }
147 }
148}
149
150impl oio::PageList for OssObjectVersionsLister {
151 async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
152 let markers = ctx.token.rsplit_once(" ");
153 let (key_marker, version_id_marker) = if let Some(data) = markers {
154 data
155 } else if let Some(start_after) = &self.abs_start_after {
156 (start_after.as_str(), "")
157 } else {
158 ("", "")
159 };
160
161 let resp = self
162 .core
163 .oss_list_object_versions(
164 &self.prefix,
165 self.delimiter,
166 self.args.limit(),
167 key_marker,
168 version_id_marker,
169 )
170 .await?;
171 if resp.status() != http::StatusCode::OK {
172 return Err(parse_error(resp));
173 }
174
175 let body = resp.into_body();
176 let output: ListObjectVersionsOutput = de::from_reader(body.reader())
177 .map_err(new_xml_deserialize_error)
178 .map_err(Error::set_temporary)?;
184
185 ctx.done = if let Some(is_truncated) = output.is_truncated {
186 !is_truncated
187 } else {
188 false
189 };
190 ctx.token = format!(
191 "{} {}",
192 output.next_key_marker.unwrap_or_default(),
193 output.next_version_id_marker.unwrap_or_default()
194 );
195
196 for prefix in output.common_prefixes {
197 let de = oio::Entry::new(
198 &build_rel_path(&self.core.root, &prefix.prefix),
199 Metadata::new(EntryMode::DIR),
200 );
201 ctx.entries.push_back(de);
202 }
203
204 for version_object in output.version {
205 if !(self.args.versions() || version_object.is_latest) {
210 continue;
211 }
212
213 let mut path = build_rel_path(&self.core.root, &version_object.key);
214 if path.is_empty() {
215 path = "/".to_owned();
216 }
217
218 let mut meta = Metadata::new(EntryMode::from_path(&path));
219 meta.set_version(&version_object.version_id);
220 meta.set_is_current(version_object.is_latest);
221 meta.set_content_length(version_object.size);
222 meta.set_last_modified(parse_datetime_from_rfc3339(
223 version_object.last_modified.as_str(),
224 )?);
225 if let Some(etag) = version_object.etag {
226 meta.set_etag(&etag);
227 meta.set_content_md5(etag.trim_matches('"'));
228 }
229
230 let entry = oio::Entry::new(&path, meta);
231 ctx.entries.push_back(entry);
232 }
233
234 if self.args.deleted() {
235 for delete_marker in output.delete_marker {
236 let mut path = build_rel_path(&self.core.root, &delete_marker.key);
237 if path.is_empty() {
238 path = "/".to_owned();
239 }
240
241 let mut meta = Metadata::new(EntryMode::FILE);
242 meta.set_version(&delete_marker.version_id);
243 meta.set_is_deleted(true);
244 meta.set_is_current(delete_marker.is_latest);
245 meta.set_last_modified(parse_datetime_from_rfc3339(
246 delete_marker.last_modified.as_str(),
247 )?);
248
249 let entry = oio::Entry::new(&path, meta);
250 ctx.entries.push_back(entry);
251 }
252 }
253
254 Ok(())
255 }
256}