opendal/types/delete/
deleter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::oio::DeleteDyn;
19use crate::raw::*;
20use crate::*;
21use futures::{Stream, StreamExt};
22use std::pin::pin;
23
24/// Deleter is designed to continuously remove content from storage.
25///
26/// It leverages batch deletion capabilities provided by storage services for efficient removal.
27///
28/// # Usage
29///
30/// [`Deleter`] provides several ways to delete files:
31///
32/// ## Direct Deletion
33///
34/// Use the `delete` method to remove a single file:
35///
36/// ```rust
37/// use opendal::Operator;
38/// use opendal::Result;
39///
40/// async fn example(op: Operator) -> Result<()> {
41///     let mut d = op.deleter().await?;
42///     d.delete("path/to/file").await?;
43///     d.close().await?;
44///     Ok(())
45/// }
46/// ```
47///
48/// Delete multiple files via a stream:
49///
50/// ```rust
51/// use opendal::Operator;
52/// use opendal::Result;
53/// use futures::stream;
54///
55/// async fn example(op: Operator) -> Result<()> {
56///     let mut d = op.deleter().await?;
57///     d.delete_stream(stream::iter(vec!["path/to/file"])).await?;
58///     d.close().await?;
59///     Ok(())
60/// }
61/// ```
62///
63/// ## Using as a Sink
64///
65/// Deleter can be used as a Sink for file deletion:
66///
67/// ```rust
68/// use opendal::Operator;
69/// use opendal::Result;
70/// use futures::{stream, Sink};
71/// use futures::SinkExt;
72///
73/// async fn example(op: Operator) -> Result<()> {
74///     let mut sink = op.deleter().await?.into_sink();
75///     sink.send("path/to/file").await?;
76///     sink.close().await?;
77///     Ok(())
78/// }
79/// ```
80pub struct Deleter {
81    deleter: oio::Deleter,
82
83    max_size: usize,
84    cur_size: usize,
85}
86
87impl Deleter {
88    pub(crate) async fn create(acc: Accessor) -> Result<Self> {
89        let max_size = acc.info().full_capability().delete_max_size.unwrap_or(1);
90        let (_, deleter) = acc.delete().await?;
91
92        Ok(Self {
93            deleter,
94            max_size,
95            cur_size: 0,
96        })
97    }
98
99    /// Delete a path.
100    pub async fn delete(&mut self, input: impl IntoDeleteInput) -> Result<()> {
101        if self.cur_size >= self.max_size {
102            let deleted = self.deleter.flush_dyn().await?;
103            self.cur_size -= deleted;
104        }
105
106        let input = input.into_delete_input();
107        let mut op = OpDelete::default();
108        if let Some(version) = &input.version {
109            op = op.with_version(version);
110        }
111
112        self.deleter.delete_dyn(&input.path, op)?;
113        self.cur_size += 1;
114        Ok(())
115    }
116
117    /// Delete an infallible iterator of paths.
118    ///
119    /// Also see:
120    ///
121    /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
122    /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
123    /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
124    pub async fn delete_iter<I, D>(&mut self, iter: I) -> Result<()>
125    where
126        I: IntoIterator<Item = D>,
127        D: IntoDeleteInput,
128    {
129        for entry in iter {
130            self.delete(entry).await?;
131        }
132        Ok(())
133    }
134
135    /// Delete a fallible iterator of paths.
136    ///
137    /// Also see:
138    ///
139    /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
140    /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
141    /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
142    pub async fn delete_try_iter<I, D>(&mut self, try_iter: I) -> Result<()>
143    where
144        I: IntoIterator<Item = Result<D>>,
145        D: IntoDeleteInput,
146    {
147        for entry in try_iter {
148            self.delete(entry?).await?;
149        }
150
151        Ok(())
152    }
153
154    /// Delete an infallible stream of paths.
155    ///
156    /// Also see:
157    ///
158    /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
159    /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
160    /// - [`Deleter::delete_try_stream`]: delete a fallible stream of paths.
161    pub async fn delete_stream<S, D>(&mut self, mut stream: S) -> Result<()>
162    where
163        S: Stream<Item = D>,
164        D: IntoDeleteInput,
165    {
166        let mut stream = pin!(stream);
167        while let Some(entry) = stream.next().await {
168            self.delete(entry).await?;
169        }
170
171        Ok(())
172    }
173
174    /// Delete a fallible stream of paths.
175    ///
176    /// Also see:
177    ///
178    /// - [`Deleter::delete_iter`]: delete an infallible iterator of paths.
179    /// - [`Deleter::delete_try_iter`]: delete a fallible iterator of paths.
180    /// - [`Deleter::delete_stream`]: delete an infallible stream of paths.
181    pub async fn delete_try_stream<S, D>(&mut self, mut try_stream: S) -> Result<()>
182    where
183        S: Stream<Item = Result<D>>,
184        D: IntoDeleteInput,
185    {
186        let mut stream = pin!(try_stream);
187        while let Some(entry) = stream.next().await.transpose()? {
188            self.delete(entry).await?;
189        }
190
191        Ok(())
192    }
193
194    /// Flush the deleter, returns the number of deleted paths.
195    pub async fn flush(&mut self) -> Result<usize> {
196        let deleted = self.deleter.flush_dyn().await?;
197        self.cur_size -= deleted;
198        Ok(deleted)
199    }
200
201    /// Close the deleter, this will flush the deleter and wait until all paths are deleted.
202    pub async fn close(&mut self) -> Result<()> {
203        loop {
204            self.flush().await?;
205            if self.cur_size == 0 {
206                break;
207            }
208        }
209        Ok(())
210    }
211
212    /// Convert the deleter into a sink.
213    pub fn into_sink<T: IntoDeleteInput>(self) -> FuturesDeleteSink<T> {
214        FuturesDeleteSink::new(self)
215    }
216}