opendal/raw/oio/delete/batch_delete.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::*;
19use crate::*;
20use std::collections::HashSet;
21use std::future::Future;
22
23/// BatchDelete is used to implement [`oio::Delete`] based on batch delete operation.
24///
25/// OneShotDeleter will perform delete operation while calling `flush`.
26pub trait BatchDelete: Send + Sync + Unpin + 'static {
27 /// delete_once delete one path at once.
28 ///
29 /// Implementations should make sure that the data is deleted correctly at once.
30 ///
31 /// BatchDeleter may call this method while there are only one path to delete.
32 fn delete_once(
33 &self,
34 path: String,
35 args: OpDelete,
36 ) -> impl Future<Output = Result<()>> + MaybeSend;
37
38 /// delete_batch delete multiple paths at once.
39 ///
40 /// - Implementations should make sure that the length of `batch` equals to the return result's length.
41 /// - Implementations should return error no path is deleted.
42 fn delete_batch(
43 &self,
44 batch: Vec<(String, OpDelete)>,
45 ) -> impl Future<Output = Result<BatchDeleteResult>> + MaybeSend;
46}
47
48/// BatchDeleteResult is the result of batch delete operation.
49#[derive(Default)]
50pub struct BatchDeleteResult {
51 /// Collection of successful deletions, containing tuples of (path, args)
52 pub succeeded: Vec<(String, OpDelete)>,
53 /// Collection of failed deletions, containing tuples of (path, args, error)
54 pub failed: Vec<(String, OpDelete, Error)>,
55}
56
57/// BatchDeleter is used to implement [`oio::Delete`] based on batch delete.
58pub struct BatchDeleter<D: BatchDelete> {
59 inner: D,
60 buffer: HashSet<(String, OpDelete)>,
61}
62
63impl<D: BatchDelete> BatchDeleter<D> {
64 /// Create a new batch deleter.
65 pub fn new(inner: D) -> Self {
66 Self {
67 inner,
68 buffer: HashSet::default(),
69 }
70 }
71}
72
73impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
74 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
75 self.buffer.insert((path.to_string(), args));
76 Ok(())
77 }
78
79 async fn flush(&mut self) -> Result<usize> {
80 if self.buffer.is_empty() {
81 return Ok(0);
82 }
83 if self.buffer.len() == 1 {
84 let (path, args) = self
85 .buffer
86 .iter()
87 .next()
88 .expect("the delete buffer size must be 1")
89 .clone();
90 self.inner.delete_once(path, args).await?;
91 self.buffer.clear();
92 return Ok(1);
93 }
94
95 let batch = self.buffer.iter().cloned().collect();
96 let result = self.inner.delete_batch(batch).await?;
97 debug_assert!(
98 !result.succeeded.is_empty(),
99 "the number of succeeded operations must be greater than 0"
100 );
101 debug_assert_eq!(
102 result.succeeded.len() + result.failed.len(),
103 self.buffer.len(),
104 "the number of succeeded and failed operations must be equal to the input batch size"
105 );
106
107 // Remove all succeeded operations from the buffer.
108 let deleted = result.succeeded.len();
109 for i in result.succeeded {
110 self.buffer.remove(&i);
111 }
112
113 // Return directly if there are non-temporary errors.
114 for (path, op, err) in result.failed {
115 if !err.is_temporary() {
116 return Err(err
117 .with_context("path", path)
118 .with_context("version", op.version().unwrap_or("<latest>")));
119 }
120 }
121
122 // Return the number of succeeded operations to allow users to decide whether
123 // to retry or push more delete operations.
124 Ok(deleted)
125 }
126}