opendal/raw/oio/delete/
batch_delete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::*;
19use crate::*;
20use std::collections::HashSet;
21use std::future::Future;
22
23/// BatchDelete is used to implement [`oio::Delete`] based on batch delete operation.
24///
25/// OneShotDeleter will perform delete operation while calling `flush`.
26pub trait BatchDelete: Send + Sync + Unpin + 'static {
27    /// delete_once delete one path at once.
28    ///
29    /// Implementations should make sure that the data is deleted correctly at once.
30    ///
31    /// BatchDeleter may call this method while there are only one path to delete.
32    fn delete_once(
33        &self,
34        path: String,
35        args: OpDelete,
36    ) -> impl Future<Output = Result<()>> + MaybeSend;
37
38    /// delete_batch delete multiple paths at once.
39    ///
40    /// - Implementations should make sure that the length of `batch` equals to the return result's length.
41    /// - Implementations should return error no path is deleted.
42    fn delete_batch(
43        &self,
44        batch: Vec<(String, OpDelete)>,
45    ) -> impl Future<Output = Result<BatchDeleteResult>> + MaybeSend;
46}
47
48/// BatchDeleteResult is the result of batch delete operation.
49#[derive(Default)]
50pub struct BatchDeleteResult {
51    /// Collection of successful deletions, containing tuples of (path, args)
52    pub succeeded: Vec<(String, OpDelete)>,
53    /// Collection of failed deletions, containing tuples of (path, args, error)
54    pub failed: Vec<(String, OpDelete, Error)>,
55}
56
57/// BatchDeleter is used to implement [`oio::Delete`] based on batch delete.
58pub struct BatchDeleter<D: BatchDelete> {
59    inner: D,
60    buffer: HashSet<(String, OpDelete)>,
61}
62
63impl<D: BatchDelete> BatchDeleter<D> {
64    /// Create a new batch deleter.
65    pub fn new(inner: D) -> Self {
66        Self {
67            inner,
68            buffer: HashSet::default(),
69        }
70    }
71}
72
73impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
74    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
75        self.buffer.insert((path.to_string(), args));
76        Ok(())
77    }
78
79    async fn flush(&mut self) -> Result<usize> {
80        if self.buffer.is_empty() {
81            return Ok(0);
82        }
83        if self.buffer.len() == 1 {
84            let (path, args) = self
85                .buffer
86                .iter()
87                .next()
88                .expect("the delete buffer size must be 1")
89                .clone();
90            self.inner.delete_once(path, args).await?;
91            self.buffer.clear();
92            return Ok(1);
93        }
94
95        let batch = self.buffer.iter().cloned().collect();
96        let result = self.inner.delete_batch(batch).await?;
97        debug_assert!(
98            !result.succeeded.is_empty(),
99            "the number of succeeded operations must be greater than 0"
100        );
101        debug_assert_eq!(
102            result.succeeded.len() + result.failed.len(),
103            self.buffer.len(),
104            "the number of succeeded and failed operations must be equal to the input batch size"
105        );
106
107        // Remove all succeeded operations from the buffer.
108        let deleted = result.succeeded.len();
109        for i in result.succeeded {
110            self.buffer.remove(&i);
111        }
112
113        // Return directly if there are non-temporary errors.
114        for (path, op, err) in result.failed {
115            if !err.is_temporary() {
116                return Err(err
117                    .with_context("path", path)
118                    .with_context("version", op.version().unwrap_or("<latest>")));
119            }
120        }
121
122        // Return the number of succeeded operations to allow users to decide whether
123        // to retry or push more delete operations.
124        Ok(deleted)
125    }
126}