opendal/types/delete/
blocking_deleter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::*;
19use crate::*;
20
21/// BlockingDeleter is designed to continuously remove content from storage.
22///
23/// It leverages batch deletion capabilities provided by storage services for efficient removal.
24pub struct BlockingDeleter {
25    deleter: oio::BlockingDeleter,
26
27    max_size: usize,
28    cur_size: usize,
29}
30
31impl BlockingDeleter {
32    pub(crate) fn create(acc: Accessor) -> Result<Self> {
33        let max_size = acc.info().full_capability().delete_max_size.unwrap_or(1);
34        let (_, deleter) = acc.blocking_delete()?;
35
36        Ok(Self {
37            deleter,
38            max_size,
39            cur_size: 0,
40        })
41    }
42
43    /// Delete a path.
44    pub fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> {
45        if self.cur_size >= self.max_size {
46            let deleted = self.deleter.flush()?;
47            self.cur_size -= deleted;
48        }
49
50        let input = input.into_delete_input();
51        let mut op = OpDelete::default();
52        if let Some(version) = &input.version {
53            op = op.with_version(version);
54        }
55
56        self.deleter.delete(&input.path, op)?;
57        self.cur_size += 1;
58        Ok(())
59    }
60
61    /// Delete an infallible iterator of paths.
62    ///
63    /// Also see:
64    ///
65    /// - [`BlockingDeleter::delete_try_iter`]: delete an fallible iterator of paths.
66    pub fn delete_iter<I, D>(&mut self, iter: I) -> Result<()>
67    where
68        I: IntoIterator<Item = D>,
69        D: IntoDeleteInput,
70    {
71        for entry in iter {
72            self.delete(entry)?;
73        }
74
75        Ok(())
76    }
77
78    /// Delete an fallible iterator of paths.
79    ///
80    /// Also see:
81    ///
82    /// - [`BlockingDeleter::delete_iter`]: delete an infallible iterator of paths.
83    pub fn delete_try_iter<I, D>(&mut self, try_iter: I) -> Result<()>
84    where
85        I: IntoIterator<Item = Result<D>>,
86        D: IntoDeleteInput,
87    {
88        for entry in try_iter {
89            self.delete(entry?)?;
90        }
91
92        Ok(())
93    }
94
95    /// Flush the deleter, returns the number of deleted paths.
96    pub fn flush(&mut self) -> Result<usize> {
97        let deleted = self.deleter.flush()?;
98        self.cur_size -= deleted;
99        Ok(deleted)
100    }
101
102    /// Close the deleter, this will flush the deleter and wait until all paths are deleted.
103    pub fn close(&mut self) -> Result<()> {
104        loop {
105            self.flush()?;
106            if self.cur_size == 0 {
107                break;
108            }
109        }
110        Ok(())
111    }
112}