opendal/raw/oio/delete/
batch_delete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::future::Future;
20
21use crate::raw::*;
22use crate::*;
23
24/// BatchDelete is used to implement [`oio::Delete`] based on batch delete operation.
25///
26/// OneShotDeleter will perform delete operation while calling `flush`.
27pub trait BatchDelete: Send + Sync + Unpin + 'static {
28    /// delete_once delete one path at once.
29    ///
30    /// Implementations should make sure that the data is deleted correctly at once.
31    ///
32    /// BatchDeleter may call this method while there are only one path to delete.
33    fn delete_once(
34        &self,
35        path: String,
36        args: OpDelete,
37    ) -> impl Future<Output = Result<()>> + MaybeSend;
38
39    /// delete_batch delete multiple paths at once.
40    ///
41    /// - Implementations should make sure that the length of `batch` equals to the return result's length.
42    /// - Implementations should return error no path is deleted.
43    fn delete_batch(
44        &self,
45        batch: Vec<(String, OpDelete)>,
46    ) -> impl Future<Output = Result<BatchDeleteResult>> + MaybeSend;
47}
48
49/// BatchDeleteResult is the result of batch delete operation.
50#[derive(Default)]
51pub struct BatchDeleteResult {
52    /// Collection of successful deletions, containing tuples of (path, args)
53    pub succeeded: Vec<(String, OpDelete)>,
54    /// Collection of failed deletions, containing tuples of (path, args, error)
55    pub failed: Vec<(String, OpDelete, Error)>,
56}
57
58/// BatchDeleter is used to implement [`oio::Delete`] based on batch delete.
59pub struct BatchDeleter<D: BatchDelete> {
60    inner: D,
61    buffer: HashSet<(String, OpDelete)>,
62}
63
64impl<D: BatchDelete> BatchDeleter<D> {
65    /// Create a new batch deleter.
66    pub fn new(inner: D) -> Self {
67        Self {
68            inner,
69            buffer: HashSet::default(),
70        }
71    }
72}
73
74impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
75    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
76        self.buffer.insert((path.to_string(), args));
77        Ok(())
78    }
79
80    async fn flush(&mut self) -> Result<usize> {
81        if self.buffer.is_empty() {
82            return Ok(0);
83        }
84        if self.buffer.len() == 1 {
85            let (path, args) = self
86                .buffer
87                .iter()
88                .next()
89                .expect("the delete buffer size must be 1")
90                .clone();
91            self.inner.delete_once(path, args).await?;
92            self.buffer.clear();
93            return Ok(1);
94        }
95
96        let batch = self.buffer.iter().cloned().collect();
97        let result = self.inner.delete_batch(batch).await?;
98        debug_assert!(
99            !result.succeeded.is_empty(),
100            "the number of succeeded operations must be greater than 0"
101        );
102        debug_assert_eq!(
103            result.succeeded.len() + result.failed.len(),
104            self.buffer.len(),
105            "the number of succeeded and failed operations must be equal to the input batch size"
106        );
107
108        // Remove all succeeded operations from the buffer.
109        let deleted = result.succeeded.len();
110        for i in result.succeeded {
111            self.buffer.remove(&i);
112        }
113
114        // Return directly if there are non-temporary errors.
115        for (path, op, err) in result.failed {
116            if !err.is_temporary() {
117                return Err(err
118                    .with_context("path", path)
119                    .with_context("version", op.version().unwrap_or("<latest>")));
120            }
121        }
122
123        // Return the number of succeeded operations to allow users to decide whether
124        // to retry or push more delete operations.
125        Ok(deleted)
126    }
127}