opendal/layers/
concurrent_limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use tokio::sync::OwnedSemaphorePermit;
22use tokio::sync::Semaphore;
23
24use crate::raw::*;
25use crate::*;
26
27/// Add concurrent request limit.
28///
29/// # Notes
30///
31/// Users can control how many concurrent connections could be established
32/// between OpenDAL and underlying storage services.
33///
34/// # Examples
35///
36/// ```no_run
37/// # use opendal::layers::ConcurrentLimitLayer;
38/// # use opendal::services;
39/// # use opendal::Operator;
40/// # use opendal::Result;
41/// # use opendal::Scheme;
42///
43/// # fn main() -> Result<()> {
44/// let _ = Operator::new(services::Memory::default())?
45///     .layer(ConcurrentLimitLayer::new(1024))
46///     .finish();
47/// Ok(())
48/// # }
49/// ```
50#[derive(Clone)]
51pub struct ConcurrentLimitLayer {
52    permits: usize,
53}
54
55impl ConcurrentLimitLayer {
56    /// Create a new ConcurrentLimitLayer will specify permits
57    pub fn new(permits: usize) -> Self {
58        Self { permits }
59    }
60}
61
62impl<A: Access> Layer<A> for ConcurrentLimitLayer {
63    type LayeredAccess = ConcurrentLimitAccessor<A>;
64
65    fn layer(&self, inner: A) -> Self::LayeredAccess {
66        ConcurrentLimitAccessor {
67            inner,
68            semaphore: Arc::new(Semaphore::new(self.permits)),
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub struct ConcurrentLimitAccessor<A: Access> {
75    inner: A,
76    semaphore: Arc<Semaphore>,
77}
78
79impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> {
80    type Inner = A;
81    type Reader = ConcurrentLimitWrapper<A::Reader>;
82    type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
83    type Writer = ConcurrentLimitWrapper<A::Writer>;
84    type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
85    type Lister = ConcurrentLimitWrapper<A::Lister>;
86    type BlockingLister = ConcurrentLimitWrapper<A::BlockingLister>;
87    type Deleter = ConcurrentLimitWrapper<A::Deleter>;
88    type BlockingDeleter = ConcurrentLimitWrapper<A::BlockingDeleter>;
89
90    fn inner(&self) -> &Self::Inner {
91        &self.inner
92    }
93
94    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
95        let _permit = self
96            .semaphore
97            .acquire()
98            .await
99            .expect("semaphore must be valid");
100
101        self.inner.create_dir(path, args).await
102    }
103
104    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
105        let permit = self
106            .semaphore
107            .clone()
108            .acquire_owned()
109            .await
110            .expect("semaphore must be valid");
111
112        self.inner
113            .read(path, args)
114            .await
115            .map(|(rp, r)| (rp, ConcurrentLimitWrapper::new(r, permit)))
116    }
117
118    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
119        let permit = self
120            .semaphore
121            .clone()
122            .acquire_owned()
123            .await
124            .expect("semaphore must be valid");
125
126        self.inner
127            .write(path, args)
128            .await
129            .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
130    }
131
132    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
133        let _permit = self
134            .semaphore
135            .acquire()
136            .await
137            .expect("semaphore must be valid");
138
139        self.inner.stat(path, args).await
140    }
141
142    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
143        let permit = self
144            .semaphore
145            .clone()
146            .acquire_owned()
147            .await
148            .expect("semaphore must be valid");
149
150        self.inner
151            .delete()
152            .await
153            .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
154    }
155
156    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
157        let permit = self
158            .semaphore
159            .clone()
160            .acquire_owned()
161            .await
162            .expect("semaphore must be valid");
163
164        self.inner
165            .list(path, args)
166            .await
167            .map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
168    }
169
170    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
171        let _permit = self
172            .semaphore
173            .try_acquire()
174            .expect("semaphore must be valid");
175
176        self.inner.blocking_create_dir(path, args)
177    }
178
179    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
180        let permit = self
181            .semaphore
182            .clone()
183            .try_acquire_owned()
184            .expect("semaphore must be valid");
185
186        self.inner
187            .blocking_read(path, args)
188            .map(|(rp, r)| (rp, ConcurrentLimitWrapper::new(r, permit)))
189    }
190
191    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
192        let permit = self
193            .semaphore
194            .clone()
195            .try_acquire_owned()
196            .expect("semaphore must be valid");
197
198        self.inner
199            .blocking_write(path, args)
200            .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
201    }
202
203    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
204        let _permit = self
205            .semaphore
206            .try_acquire()
207            .expect("semaphore must be valid");
208
209        self.inner.blocking_stat(path, args)
210    }
211
212    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
213        let permit = self
214            .semaphore
215            .clone()
216            .try_acquire_owned()
217            .expect("semaphore must be valid");
218
219        self.inner
220            .blocking_delete()
221            .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
222    }
223
224    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
225        let permit = self
226            .semaphore
227            .clone()
228            .try_acquire_owned()
229            .expect("semaphore must be valid");
230
231        self.inner
232            .blocking_list(path, args)
233            .map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit)))
234    }
235}
236
237pub struct ConcurrentLimitWrapper<R> {
238    inner: R,
239
240    // Hold on this permit until this reader has been dropped.
241    _permit: OwnedSemaphorePermit,
242}
243
244impl<R> ConcurrentLimitWrapper<R> {
245    fn new(inner: R, permit: OwnedSemaphorePermit) -> Self {
246        Self {
247            inner,
248            _permit: permit,
249        }
250    }
251}
252
253impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
254    async fn read(&mut self) -> Result<Buffer> {
255        self.inner.read().await
256    }
257}
258
259impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
260    fn read(&mut self) -> Result<Buffer> {
261        self.inner.read()
262    }
263}
264
265impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
266    async fn write(&mut self, bs: Buffer) -> Result<()> {
267        self.inner.write(bs).await
268    }
269
270    async fn close(&mut self) -> Result<Metadata> {
271        self.inner.close().await
272    }
273
274    async fn abort(&mut self) -> Result<()> {
275        self.inner.abort().await
276    }
277}
278
279impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
280    fn write(&mut self, bs: Buffer) -> Result<()> {
281        self.inner.write(bs)
282    }
283
284    fn close(&mut self) -> Result<Metadata> {
285        self.inner.close()
286    }
287}
288
289impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
290    async fn next(&mut self) -> Result<Option<oio::Entry>> {
291        self.inner.next().await
292    }
293}
294
295impl<R: oio::BlockingList> oio::BlockingList for ConcurrentLimitWrapper<R> {
296    fn next(&mut self) -> Result<Option<oio::Entry>> {
297        self.inner.next()
298    }
299}
300
301impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
302    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
303        self.inner.delete(path, args)
304    }
305
306    async fn flush(&mut self) -> Result<usize> {
307        self.inner.flush().await
308    }
309}
310
311impl<R: oio::BlockingDelete> oio::BlockingDelete for ConcurrentLimitWrapper<R> {
312    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
313        self.inner.delete(path, args)
314    }
315
316    fn flush(&mut self) -> Result<usize> {
317        self.inner.flush()
318    }
319}