opendal/types/delete/blocking_deleter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::*;
19use crate::*;
20
21/// BlockingDeleter is designed to continuously remove content from storage.
22///
23/// It leverages batch deletion capabilities provided by storage services for efficient removal.
24pub struct BlockingDeleter {
25 deleter: oio::BlockingDeleter,
26
27 max_size: usize,
28 cur_size: usize,
29}
30
31impl BlockingDeleter {
32 pub(crate) fn create(acc: Accessor) -> Result<Self> {
33 let max_size = acc.info().full_capability().delete_max_size.unwrap_or(1);
34 let (_, deleter) = acc.blocking_delete()?;
35
36 Ok(Self {
37 deleter,
38 max_size,
39 cur_size: 0,
40 })
41 }
42
43 /// Delete a path.
44 pub fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> {
45 if self.cur_size >= self.max_size {
46 let deleted = self.deleter.flush()?;
47 self.cur_size -= deleted;
48 }
49
50 let input = input.into_delete_input();
51 let mut op = OpDelete::default();
52 if let Some(version) = &input.version {
53 op = op.with_version(version);
54 }
55
56 self.deleter.delete(&input.path, op)?;
57 self.cur_size += 1;
58 Ok(())
59 }
60
61 /// Delete an infallible iterator of paths.
62 ///
63 /// Also see:
64 ///
65 /// - [`BlockingDeleter::delete_try_iter`]: delete an fallible iterator of paths.
66 pub fn delete_iter<I, D>(&mut self, iter: I) -> Result<()>
67 where
68 I: IntoIterator<Item = D>,
69 D: IntoDeleteInput,
70 {
71 for entry in iter {
72 self.delete(entry)?;
73 }
74
75 Ok(())
76 }
77
78 /// Delete an fallible iterator of paths.
79 ///
80 /// Also see:
81 ///
82 /// - [`BlockingDeleter::delete_iter`]: delete an infallible iterator of paths.
83 pub fn delete_try_iter<I, D>(&mut self, try_iter: I) -> Result<()>
84 where
85 I: IntoIterator<Item = Result<D>>,
86 D: IntoDeleteInput,
87 {
88 for entry in try_iter {
89 self.delete(entry?)?;
90 }
91
92 Ok(())
93 }
94
95 /// Flush the deleter, returns the number of deleted paths.
96 pub fn flush(&mut self) -> Result<usize> {
97 let deleted = self.deleter.flush()?;
98 self.cur_size -= deleted;
99 Ok(deleted)
100 }
101
102 /// Close the deleter, this will flush the deleter and wait until all paths are deleted.
103 pub fn close(&mut self) -> Result<()> {
104 loop {
105 self.flush()?;
106 if self.cur_size == 0 {
107 break;
108 }
109 }
110 Ok(())
111 }
112}