opendal/layers/
concurrent_limit.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use tokio::sync::OwnedSemaphorePermit;
22use tokio::sync::Semaphore;
23
24use crate::raw::*;
25use crate::*;
26
27#[derive(Clone)]
51pub struct ConcurrentLimitLayer {
52 permits: usize,
53}
54
55impl ConcurrentLimitLayer {
56 pub fn new(permits: usize) -> Self {
58 Self { permits }
59 }
60}
61
62impl<A: Access> Layer<A> for ConcurrentLimitLayer {
63 type LayeredAccess = ConcurrentLimitAccessor<A>;
64
65 fn layer(&self, inner: A) -> Self::LayeredAccess {
66 ConcurrentLimitAccessor {
67 inner,
68 semaphore: Arc::new(Semaphore::new(self.permits)),
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
74pub struct ConcurrentLimitAccessor<A: Access> {
75 inner: A,
76 semaphore: Arc<Semaphore>,
77}
78
79impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> {
80 type Inner = A;
81 type Reader = ConcurrentLimitWrapper<A::Reader>;
82 type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
83 type Writer = ConcurrentLimitWrapper<A::Writer>;
84 type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
85 type Lister = ConcurrentLimitWrapper<A::Lister>;
86 type BlockingLister = ConcurrentLimitWrapper<A::BlockingLister>;
87 type Deleter = ConcurrentLimitWrapper<A::Deleter>;
88 type BlockingDeleter = ConcurrentLimitWrapper<A::BlockingDeleter>;
89
90 fn inner(&self) -> &Self::Inner {
91 &self.inner
92 }
93
94 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
95 let _permit = self
96 .semaphore
97 .acquire()
98 .await
99 .expect("semaphore must be valid");
100
101 self.inner.create_dir(path, args).await
102 }
103
104 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
105 let permit = self
106 .semaphore
107 .clone()
108 .acquire_owned()
109 .await
110 .expect("semaphore must be valid");
111
112 self.inner
113 .read(path, args)
114 .await
115 .map(|(rp, r)| (rp, ConcurrentLimitWrapper::new(r, permit)))
116 }
117
118 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
119 let permit = self
120 .semaphore
121 .clone()
122 .acquire_owned()
123 .await
124 .expect("semaphore must be valid");
125
126 self.inner
127 .write(path, args)
128 .await
129 .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
130 }
131
132 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
133 let _permit = self
134 .semaphore
135 .acquire()
136 .await
137 .expect("semaphore must be valid");
138
139 self.inner.stat(path, args).await
140 }
141
142 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
143 let permit = self
144 .semaphore
145 .clone()
146 .acquire_owned()
147 .await
148 .expect("semaphore must be valid");
149
150 self.inner
151 .delete()
152 .await
153 .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
154 }
155
156 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
157 let permit = self
158 .semaphore
159 .clone()
160 .acquire_owned()
161 .await
162 .expect("semaphore must be valid");
163
164 self.inner
165 .list(path, args)
166 .await
167 .map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
168 }
169
170 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
171 let _permit = self
172 .semaphore
173 .try_acquire()
174 .expect("semaphore must be valid");
175
176 self.inner.blocking_create_dir(path, args)
177 }
178
179 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
180 let permit = self
181 .semaphore
182 .clone()
183 .try_acquire_owned()
184 .expect("semaphore must be valid");
185
186 self.inner
187 .blocking_read(path, args)
188 .map(|(rp, r)| (rp, ConcurrentLimitWrapper::new(r, permit)))
189 }
190
191 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
192 let permit = self
193 .semaphore
194 .clone()
195 .try_acquire_owned()
196 .expect("semaphore must be valid");
197
198 self.inner
199 .blocking_write(path, args)
200 .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
201 }
202
203 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
204 let _permit = self
205 .semaphore
206 .try_acquire()
207 .expect("semaphore must be valid");
208
209 self.inner.blocking_stat(path, args)
210 }
211
212 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
213 let permit = self
214 .semaphore
215 .clone()
216 .try_acquire_owned()
217 .expect("semaphore must be valid");
218
219 self.inner
220 .blocking_delete()
221 .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
222 }
223
224 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
225 let permit = self
226 .semaphore
227 .clone()
228 .try_acquire_owned()
229 .expect("semaphore must be valid");
230
231 self.inner
232 .blocking_list(path, args)
233 .map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit)))
234 }
235}
236
237pub struct ConcurrentLimitWrapper<R> {
238 inner: R,
239
240 _permit: OwnedSemaphorePermit,
242}
243
244impl<R> ConcurrentLimitWrapper<R> {
245 fn new(inner: R, permit: OwnedSemaphorePermit) -> Self {
246 Self {
247 inner,
248 _permit: permit,
249 }
250 }
251}
252
253impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
254 async fn read(&mut self) -> Result<Buffer> {
255 self.inner.read().await
256 }
257}
258
259impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
260 fn read(&mut self) -> Result<Buffer> {
261 self.inner.read()
262 }
263}
264
265impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
266 async fn write(&mut self, bs: Buffer) -> Result<()> {
267 self.inner.write(bs).await
268 }
269
270 async fn close(&mut self) -> Result<Metadata> {
271 self.inner.close().await
272 }
273
274 async fn abort(&mut self) -> Result<()> {
275 self.inner.abort().await
276 }
277}
278
279impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
280 fn write(&mut self, bs: Buffer) -> Result<()> {
281 self.inner.write(bs)
282 }
283
284 fn close(&mut self) -> Result<Metadata> {
285 self.inner.close()
286 }
287}
288
289impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
290 async fn next(&mut self) -> Result<Option<oio::Entry>> {
291 self.inner.next().await
292 }
293}
294
295impl<R: oio::BlockingList> oio::BlockingList for ConcurrentLimitWrapper<R> {
296 fn next(&mut self) -> Result<Option<oio::Entry>> {
297 self.inner.next()
298 }
299}
300
301impl<R: oio::Delete> oio::Delete for ConcurrentLimitWrapper<R> {
302 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
303 self.inner.delete(path, args)
304 }
305
306 async fn flush(&mut self) -> Result<usize> {
307 self.inner.flush().await
308 }
309}
310
311impl<R: oio::BlockingDelete> oio::BlockingDelete for ConcurrentLimitWrapper<R> {
312 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
313 self.inner.delete(path, args)
314 }
315
316 fn flush(&mut self) -> Result<usize> {
317 self.inner.flush()
318 }
319}