opendal/layers/
immutable_index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::fmt::Debug;
20use std::vec::IntoIter;
21
22use crate::raw::*;
23use crate::*;
24
25/// Add an immutable in-memory index for underlying storage services.
26///
27/// Especially useful for services without list capability like HTTP.
28///
29/// # Examples
30///
31/// ```rust, no_run
32/// # use std::collections::HashMap;
33///
34/// # use opendal::layers::ImmutableIndexLayer;
35/// # use opendal::services;
36/// # use opendal::Operator;
37/// # use opendal::Result;
38///
39/// # fn main() -> Result<()> {
40/// let mut iil = ImmutableIndexLayer::default();
41///
42/// for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
43///     iil.insert(i.to_string())
44/// }
45///
46/// let op = Operator::from_iter::<services::Memory>(HashMap::<_, _>::default())?
47///     .layer(iil)
48///     .finish();
49/// Ok(())
50/// # }
51/// ```
52#[derive(Default, Debug, Clone)]
53pub struct ImmutableIndexLayer {
54    vec: Vec<String>,
55}
56
57impl ImmutableIndexLayer {
58    /// Insert a key into index.
59    pub fn insert(&mut self, key: String) {
60        self.vec.push(key);
61    }
62
63    /// Insert keys from iter.
64    pub fn extend_iter<I>(&mut self, iter: I)
65    where
66        I: IntoIterator<Item = String>,
67    {
68        self.vec.extend(iter);
69    }
70}
71
72impl<A: Access> Layer<A> for ImmutableIndexLayer {
73    type LayeredAccess = ImmutableIndexAccessor<A>;
74
75    fn layer(&self, inner: A) -> Self::LayeredAccess {
76        let info = inner.info();
77        info.update_full_capability(|mut cap| {
78            cap.list = true;
79            cap.list_with_recursive = true;
80            cap
81        });
82
83        ImmutableIndexAccessor {
84            vec: self.vec.clone(),
85            inner,
86        }
87    }
88}
89
90#[derive(Debug, Clone)]
91pub struct ImmutableIndexAccessor<A: Access> {
92    inner: A,
93    vec: Vec<String>,
94}
95
96impl<A: Access> ImmutableIndexAccessor<A> {
97    fn children_flat(&self, path: &str) -> Vec<String> {
98        self.vec
99            .iter()
100            .filter(|v| v.starts_with(path) && v.as_str() != path)
101            .cloned()
102            .collect()
103    }
104
105    fn children_hierarchy(&self, path: &str) -> Vec<String> {
106        let mut res = HashSet::new();
107
108        for i in self.vec.iter() {
109            // `/xyz` should not belong to `/abc`
110            if !i.starts_with(path) {
111                continue;
112            }
113
114            // remove `/abc` if self
115            if i == path {
116                continue;
117            }
118
119            match i[path.len()..].find('/') {
120                // File `/abc/def.csv` must belong to `/abc`
121                None => {
122                    res.insert(i.to_string());
123                }
124                Some(idx) => {
125                    // The index of first `/` after `/abc`.
126                    let dir_idx = idx + 1 + path.len();
127
128                    if dir_idx == i.len() {
129                        // Dir `/abc/def/` belongs to `/abc/`
130                        res.insert(i.to_string());
131                    } else {
132                        // File/Dir `/abc/def/xyz` doesn't belong to `/abc`.
133                        // But we need to list `/abc/def` out so that we can walk down.
134                        res.insert(i[..dir_idx].to_string());
135                    }
136                }
137            }
138        }
139
140        res.into_iter().collect()
141    }
142}
143
144impl<A: Access> LayeredAccess for ImmutableIndexAccessor<A> {
145    type Inner = A;
146    type Reader = A::Reader;
147    type Writer = A::Writer;
148    type Lister = ImmutableDir;
149    type Deleter = A::Deleter;
150    type BlockingReader = A::BlockingReader;
151    type BlockingWriter = A::BlockingWriter;
152    type BlockingLister = ImmutableDir;
153    type BlockingDeleter = A::BlockingDeleter;
154
155    fn inner(&self) -> &Self::Inner {
156        &self.inner
157    }
158
159    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
160        self.inner.read(path, args).await
161    }
162
163    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
164        self.inner.write(path, args).await
165    }
166
167    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
168        let mut path = path;
169        if path == "/" {
170            path = ""
171        }
172
173        let idx = if args.recursive() {
174            self.children_flat(path)
175        } else {
176            self.children_hierarchy(path)
177        };
178
179        Ok((RpList::default(), ImmutableDir::new(idx)))
180    }
181
182    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
183        self.inner.delete().await
184    }
185
186    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
187        self.inner.blocking_read(path, args)
188    }
189
190    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
191        self.inner.blocking_write(path, args)
192    }
193
194    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
195        let mut path = path;
196        if path == "/" {
197            path = ""
198        }
199
200        let idx = if args.recursive() {
201            self.children_flat(path)
202        } else {
203            self.children_hierarchy(path)
204        };
205
206        Ok((RpList::default(), ImmutableDir::new(idx)))
207    }
208
209    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
210        self.inner.blocking_delete()
211    }
212}
213
214pub struct ImmutableDir {
215    idx: IntoIter<String>,
216}
217
218impl ImmutableDir {
219    fn new(idx: Vec<String>) -> Self {
220        Self {
221            idx: idx.into_iter(),
222        }
223    }
224
225    fn inner_next(&mut self) -> Option<oio::Entry> {
226        self.idx.next().map(|v| {
227            let mode = if v.ends_with('/') {
228                EntryMode::DIR
229            } else {
230                EntryMode::FILE
231            };
232            let meta = Metadata::new(mode);
233            oio::Entry::with(v, meta)
234        })
235    }
236}
237
238impl oio::List for ImmutableDir {
239    async fn next(&mut self) -> Result<Option<oio::Entry>> {
240        Ok(self.inner_next())
241    }
242}
243
244impl oio::BlockingList for ImmutableDir {
245    fn next(&mut self) -> Result<Option<oio::Entry>> {
246        Ok(self.inner_next())
247    }
248}
249
250#[cfg(test)]
251#[cfg(feature = "services-http")]
252mod tests {
253    use std::collections::HashMap;
254    use std::collections::HashSet;
255
256    use anyhow::Result;
257    use futures::TryStreamExt;
258    use log::debug;
259
260    use super::*;
261    use crate::layers::LoggingLayer;
262    use crate::services::HttpConfig;
263    use crate::EntryMode;
264    use crate::Operator;
265
266    #[tokio::test]
267    async fn test_list() -> Result<()> {
268        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
269
270        let mut iil = ImmutableIndexLayer::default();
271        for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
272            iil.insert(i.to_string())
273        }
274
275        let op = HttpConfig::from_iter({
276            let mut map = HashMap::new();
277            map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
278            map
279        })
280        .and_then(Operator::from_config)?
281        .layer(LoggingLayer::default())
282        .layer(iil)
283        .finish();
284
285        let mut map = HashMap::new();
286        let mut set = HashSet::new();
287        let mut ds = op.lister("").await?;
288        while let Some(entry) = ds.try_next().await? {
289            debug!("got entry: {}", entry.path());
290            assert!(
291                set.insert(entry.path().to_string()),
292                "duplicated value: {}",
293                entry.path()
294            );
295            map.insert(entry.path().to_string(), entry.metadata().mode());
296        }
297
298        assert_eq!(map["file"], EntryMode::FILE);
299        assert_eq!(map["dir/"], EntryMode::DIR);
300        assert_eq!(map["dir_without_prefix/"], EntryMode::DIR);
301        Ok(())
302    }
303
304    #[tokio::test]
305    async fn test_scan() -> Result<()> {
306        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
307
308        let mut iil = ImmutableIndexLayer::default();
309        for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
310            iil.insert(i.to_string())
311        }
312
313        let op = HttpConfig::from_iter({
314            let mut map = HashMap::new();
315            map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
316            map
317        })
318        .and_then(Operator::from_config)?
319        .layer(LoggingLayer::default())
320        .layer(iil)
321        .finish();
322
323        let mut ds = op.lister_with("/").recursive(true).await?;
324        let mut set = HashSet::new();
325        let mut map = HashMap::new();
326        while let Some(entry) = ds.try_next().await? {
327            debug!("got entry: {}", entry.path());
328            assert!(
329                set.insert(entry.path().to_string()),
330                "duplicated value: {}",
331                entry.path()
332            );
333            map.insert(entry.path().to_string(), entry.metadata().mode());
334        }
335
336        debug!("current files: {:?}", map);
337
338        assert_eq!(map["file"], EntryMode::FILE);
339        assert_eq!(map["dir/"], EntryMode::DIR);
340        assert_eq!(map["dir_without_prefix/file"], EntryMode::FILE);
341        Ok(())
342    }
343
344    #[tokio::test]
345    async fn test_list_dir() -> Result<()> {
346        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
347
348        let mut iil = ImmutableIndexLayer::default();
349        for i in [
350            "dataset/stateful/ontime_2007_200.csv",
351            "dataset/stateful/ontime_2008_200.csv",
352            "dataset/stateful/ontime_2009_200.csv",
353        ] {
354            iil.insert(i.to_string())
355        }
356
357        let op = HttpConfig::from_iter({
358            let mut map = HashMap::new();
359            map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
360            map
361        })
362        .and_then(Operator::from_config)?
363        .layer(LoggingLayer::default())
364        .layer(iil)
365        .finish();
366
367        //  List /
368        let mut map = HashMap::new();
369        let mut set = HashSet::new();
370        let mut ds = op.lister("/").await?;
371        while let Some(entry) = ds.try_next().await? {
372            assert!(
373                set.insert(entry.path().to_string()),
374                "duplicated value: {}",
375                entry.path()
376            );
377            map.insert(entry.path().to_string(), entry.metadata().mode());
378        }
379
380        assert_eq!(map.len(), 1);
381        assert_eq!(map["dataset/"], EntryMode::DIR);
382
383        //  List dataset/stateful/
384        let mut map = HashMap::new();
385        let mut set = HashSet::new();
386        let mut ds = op.lister("dataset/stateful/").await?;
387        while let Some(entry) = ds.try_next().await? {
388            assert!(
389                set.insert(entry.path().to_string()),
390                "duplicated value: {}",
391                entry.path()
392            );
393            map.insert(entry.path().to_string(), entry.metadata().mode());
394        }
395
396        assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
397        assert_eq!(map["dataset/stateful/ontime_2008_200.csv"], EntryMode::FILE);
398        assert_eq!(map["dataset/stateful/ontime_2009_200.csv"], EntryMode::FILE);
399        Ok(())
400    }
401
402    #[tokio::test]
403    async fn test_walk_top_down_dir() -> Result<()> {
404        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
405
406        let mut iil = ImmutableIndexLayer::default();
407        for i in [
408            "dataset/stateful/ontime_2007_200.csv",
409            "dataset/stateful/ontime_2008_200.csv",
410            "dataset/stateful/ontime_2009_200.csv",
411        ] {
412            iil.insert(i.to_string())
413        }
414
415        let op = HttpConfig::from_iter({
416            let mut map = HashMap::new();
417            map.insert("endpoint".to_string(), "https://xuanwo.io".to_string());
418            map
419        })
420        .and_then(Operator::from_config)?
421        .layer(LoggingLayer::default())
422        .layer(iil)
423        .finish();
424
425        let mut ds = op.lister_with("/").recursive(true).await?;
426
427        let mut map = HashMap::new();
428        let mut set = HashSet::new();
429        while let Some(entry) = ds.try_next().await? {
430            assert!(
431                set.insert(entry.path().to_string()),
432                "duplicated value: {}",
433                entry.path()
434            );
435            map.insert(entry.path().to_string(), entry.metadata().mode());
436        }
437
438        debug!("current files: {:?}", map);
439
440        assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
441        assert_eq!(map["dataset/stateful/ontime_2008_200.csv"], EntryMode::FILE);
442        assert_eq!(map["dataset/stateful/ontime_2009_200.csv"], EntryMode::FILE);
443        Ok(())
444    }
445}