1use std::collections::HashSet;
19use std::fmt::Debug;
20use std::vec::IntoIter;
21
22use crate::raw::*;
23use crate::*;
24
25#[derive(Default, Debug, Clone)]
53pub struct ImmutableIndexLayer {
54 vec: Vec<String>,
55}
56
57impl ImmutableIndexLayer {
58 pub fn insert(&mut self, key: String) {
60 self.vec.push(key);
61 }
62
63 pub fn extend_iter<I>(&mut self, iter: I)
65 where
66 I: IntoIterator<Item = String>,
67 {
68 self.vec.extend(iter);
69 }
70}
71
72impl<A: Access> Layer<A> for ImmutableIndexLayer {
73 type LayeredAccess = ImmutableIndexAccessor<A>;
74
75 fn layer(&self, inner: A) -> Self::LayeredAccess {
76 let info = inner.info();
77 info.update_full_capability(|mut cap| {
78 cap.list = true;
79 cap.list_with_recursive = true;
80 cap
81 });
82
83 ImmutableIndexAccessor {
84 vec: self.vec.clone(),
85 inner,
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
91pub struct ImmutableIndexAccessor<A: Access> {
92 inner: A,
93 vec: Vec<String>,
94}
95
96impl<A: Access> ImmutableIndexAccessor<A> {
97 fn children_flat(&self, path: &str) -> Vec<String> {
98 self.vec
99 .iter()
100 .filter(|v| v.starts_with(path) && v.as_str() != path)
101 .cloned()
102 .collect()
103 }
104
105 fn children_hierarchy(&self, path: &str) -> Vec<String> {
106 let mut res = HashSet::new();
107
108 for i in self.vec.iter() {
109 if !i.starts_with(path) {
111 continue;
112 }
113
114 if i == path {
116 continue;
117 }
118
119 match i[path.len()..].find('/') {
120 None => {
122 res.insert(i.to_string());
123 }
124 Some(idx) => {
125 let dir_idx = idx + 1 + path.len();
127
128 if dir_idx == i.len() {
129 res.insert(i.to_string());
131 } else {
132 res.insert(i[..dir_idx].to_string());
135 }
136 }
137 }
138 }
139
140 res.into_iter().collect()
141 }
142}
143
144impl<A: Access> LayeredAccess for ImmutableIndexAccessor<A> {
145 type Inner = A;
146 type Reader = A::Reader;
147 type Writer = A::Writer;
148 type Lister = ImmutableDir;
149 type Deleter = A::Deleter;
150 type BlockingReader = A::BlockingReader;
151 type BlockingWriter = A::BlockingWriter;
152 type BlockingLister = ImmutableDir;
153 type BlockingDeleter = A::BlockingDeleter;
154
155 fn inner(&self) -> &Self::Inner {
156 &self.inner
157 }
158
159 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
160 self.inner.read(path, args).await
161 }
162
163 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
164 self.inner.write(path, args).await
165 }
166
167 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
168 let mut path = path;
169 if path == "/" {
170 path = ""
171 }
172
173 let idx = if args.recursive() {
174 self.children_flat(path)
175 } else {
176 self.children_hierarchy(path)
177 };
178
179 Ok((RpList::default(), ImmutableDir::new(idx)))
180 }
181
182 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
183 self.inner.delete().await
184 }
185
186 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
187 self.inner.blocking_read(path, args)
188 }
189
190 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
191 self.inner.blocking_write(path, args)
192 }
193
194 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
195 let mut path = path;
196 if path == "/" {
197 path = ""
198 }
199
200 let idx = if args.recursive() {
201 self.children_flat(path)
202 } else {
203 self.children_hierarchy(path)
204 };
205
206 Ok((RpList::default(), ImmutableDir::new(idx)))
207 }
208
209 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
210 self.inner.blocking_delete()
211 }
212}
213
214pub struct ImmutableDir {
215 idx: IntoIter<String>,
216}
217
218impl ImmutableDir {
219 fn new(idx: Vec<String>) -> Self {
220 Self {
221 idx: idx.into_iter(),
222 }
223 }
224
225 fn inner_next(&mut self) -> Option<oio::Entry> {
226 self.idx.next().map(|v| {
227 let mode = if v.ends_with('/') {
228 EntryMode::DIR
229 } else {
230 EntryMode::FILE
231 };
232 let meta = Metadata::new(mode);
233 oio::Entry::with(v, meta)
234 })
235 }
236}
237
238impl oio::List for ImmutableDir {
239 async fn next(&mut self) -> Result<Option<oio::Entry>> {
240 Ok(self.inner_next())
241 }
242}
243
244impl oio::BlockingList for ImmutableDir {
245 fn next(&mut self) -> Result<Option<oio::Entry>> {
246 Ok(self.inner_next())
247 }
248}
249
250#[cfg(test)]
251#[cfg(feature = "services-http")]
252mod tests {
253 use std::collections::HashMap;
254 use std::collections::HashSet;
255
256 use anyhow::Result;
257 use futures::TryStreamExt;
258 use log::debug;
259
260 use super::*;
261 use crate::layers::LoggingLayer;
262 use crate::services::HttpConfig;
263 use crate::EntryMode;
264 use crate::Operator;
265
266 #[tokio::test]
267 async fn test_list() -> Result<()> {
268 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
269
270 let mut iil = ImmutableIndexLayer::default();
271 for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
272 iil.insert(i.to_string())
273 }
274
275 let op = HttpConfig::from_iter({
276 let mut map = HashMap::new();
277 map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
278 map
279 })
280 .and_then(Operator::from_config)?
281 .layer(LoggingLayer::default())
282 .layer(iil)
283 .finish();
284
285 let mut map = HashMap::new();
286 let mut set = HashSet::new();
287 let mut ds = op.lister("").await?;
288 while let Some(entry) = ds.try_next().await? {
289 debug!("got entry: {}", entry.path());
290 assert!(
291 set.insert(entry.path().to_string()),
292 "duplicated value: {}",
293 entry.path()
294 );
295 map.insert(entry.path().to_string(), entry.metadata().mode());
296 }
297
298 assert_eq!(map["file"], EntryMode::FILE);
299 assert_eq!(map["dir/"], EntryMode::DIR);
300 assert_eq!(map["dir_without_prefix/"], EntryMode::DIR);
301 Ok(())
302 }
303
304 #[tokio::test]
305 async fn test_scan() -> Result<()> {
306 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
307
308 let mut iil = ImmutableIndexLayer::default();
309 for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
310 iil.insert(i.to_string())
311 }
312
313 let op = HttpConfig::from_iter({
314 let mut map = HashMap::new();
315 map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
316 map
317 })
318 .and_then(Operator::from_config)?
319 .layer(LoggingLayer::default())
320 .layer(iil)
321 .finish();
322
323 let mut ds = op.lister_with("/").recursive(true).await?;
324 let mut set = HashSet::new();
325 let mut map = HashMap::new();
326 while let Some(entry) = ds.try_next().await? {
327 debug!("got entry: {}", entry.path());
328 assert!(
329 set.insert(entry.path().to_string()),
330 "duplicated value: {}",
331 entry.path()
332 );
333 map.insert(entry.path().to_string(), entry.metadata().mode());
334 }
335
336 debug!("current files: {:?}", map);
337
338 assert_eq!(map["file"], EntryMode::FILE);
339 assert_eq!(map["dir/"], EntryMode::DIR);
340 assert_eq!(map["dir_without_prefix/file"], EntryMode::FILE);
341 Ok(())
342 }
343
344 #[tokio::test]
345 async fn test_list_dir() -> Result<()> {
346 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
347
348 let mut iil = ImmutableIndexLayer::default();
349 for i in [
350 "dataset/stateful/ontime_2007_200.csv",
351 "dataset/stateful/ontime_2008_200.csv",
352 "dataset/stateful/ontime_2009_200.csv",
353 ] {
354 iil.insert(i.to_string())
355 }
356
357 let op = HttpConfig::from_iter({
358 let mut map = HashMap::new();
359 map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
360 map
361 })
362 .and_then(Operator::from_config)?
363 .layer(LoggingLayer::default())
364 .layer(iil)
365 .finish();
366
367 let mut map = HashMap::new();
369 let mut set = HashSet::new();
370 let mut ds = op.lister("/").await?;
371 while let Some(entry) = ds.try_next().await? {
372 assert!(
373 set.insert(entry.path().to_string()),
374 "duplicated value: {}",
375 entry.path()
376 );
377 map.insert(entry.path().to_string(), entry.metadata().mode());
378 }
379
380 assert_eq!(map.len(), 1);
381 assert_eq!(map["dataset/"], EntryMode::DIR);
382
383 let mut map = HashMap::new();
385 let mut set = HashSet::new();
386 let mut ds = op.lister("dataset/stateful/").await?;
387 while let Some(entry) = ds.try_next().await? {
388 assert!(
389 set.insert(entry.path().to_string()),
390 "duplicated value: {}",
391 entry.path()
392 );
393 map.insert(entry.path().to_string(), entry.metadata().mode());
394 }
395
396 assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
397 assert_eq!(map["dataset/stateful/ontime_2008_200.csv"], EntryMode::FILE);
398 assert_eq!(map["dataset/stateful/ontime_2009_200.csv"], EntryMode::FILE);
399 Ok(())
400 }
401
402 #[tokio::test]
403 async fn test_walk_top_down_dir() -> Result<()> {
404 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
405
406 let mut iil = ImmutableIndexLayer::default();
407 for i in [
408 "dataset/stateful/ontime_2007_200.csv",
409 "dataset/stateful/ontime_2008_200.csv",
410 "dataset/stateful/ontime_2009_200.csv",
411 ] {
412 iil.insert(i.to_string())
413 }
414
415 let op = HttpConfig::from_iter({
416 let mut map = HashMap::new();
417 map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
418 map
419 })
420 .and_then(Operator::from_config)?
421 .layer(LoggingLayer::default())
422 .layer(iil)
423 .finish();
424
425 let mut ds = op.lister_with("/").recursive(true).await?;
426
427 let mut map = HashMap::new();
428 let mut set = HashSet::new();
429 while let Some(entry) = ds.try_next().await? {
430 assert!(
431 set.insert(entry.path().to_string()),
432 "duplicated value: {}",
433 entry.path()
434 );
435 map.insert(entry.path().to_string(), entry.metadata().mode());
436 }
437
438 debug!("current files: {:?}", map);
439
440 assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
441 assert_eq!(map["dataset/stateful/ontime_2008_200.csv"], EntryMode::FILE);
442 assert_eq!(map["dataset/stateful/ontime_2009_200.csv"], EntryMode::FILE);
443 Ok(())
444 }
445}