opendal/layers/
throttle.rs1use std::num::NonZeroU32;
19use std::sync::Arc;
20use std::thread;
21
22use governor::clock::Clock;
23use governor::clock::DefaultClock;
24use governor::middleware::NoOpMiddleware;
25use governor::state::InMemoryState;
26use governor::state::NotKeyed;
27use governor::Quota;
28use governor::RateLimiter;
29
30use crate::raw::*;
31use crate::*;
32
33#[derive(Clone)]
69pub struct ThrottleLayer {
70 bandwidth: NonZeroU32,
71 burst: NonZeroU32,
72}
73
74impl ThrottleLayer {
75 pub fn new(bandwidth: u32, burst: u32) -> Self {
80 assert!(bandwidth > 0);
81 assert!(burst > 0);
82 Self {
83 bandwidth: NonZeroU32::new(bandwidth).unwrap(),
84 burst: NonZeroU32::new(burst).unwrap(),
85 }
86 }
87}
88
89impl<A: Access> Layer<A> for ThrottleLayer {
90 type LayeredAccess = ThrottleAccessor<A>;
91
92 fn layer(&self, accessor: A) -> Self::LayeredAccess {
93 let rate_limiter = Arc::new(RateLimiter::direct(
94 Quota::per_second(self.bandwidth).allow_burst(self.burst),
95 ));
96 ThrottleAccessor {
97 inner: accessor,
98 rate_limiter,
99 }
100 }
101}
102
103type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
107
108#[derive(Debug, Clone)]
109pub struct ThrottleAccessor<A: Access> {
110 inner: A,
111 rate_limiter: SharedRateLimiter,
112}
113
114impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
115 type Inner = A;
116 type Reader = ThrottleWrapper<A::Reader>;
117 type Writer = ThrottleWrapper<A::Writer>;
118 type Lister = A::Lister;
119 type Deleter = A::Deleter;
120 type BlockingReader = ThrottleWrapper<A::BlockingReader>;
121 type BlockingWriter = ThrottleWrapper<A::BlockingWriter>;
122 type BlockingLister = A::BlockingLister;
123 type BlockingDeleter = A::BlockingDeleter;
124
125 fn inner(&self) -> &Self::Inner {
126 &self.inner
127 }
128
129 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
130 let limiter = self.rate_limiter.clone();
131
132 self.inner
133 .read(path, args)
134 .await
135 .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
136 }
137
138 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
139 let limiter = self.rate_limiter.clone();
140
141 self.inner
142 .write(path, args)
143 .await
144 .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
145 }
146
147 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
148 self.inner.delete().await
149 }
150
151 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
152 self.inner.list(path, args).await
153 }
154
155 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
156 let limiter = self.rate_limiter.clone();
157
158 self.inner
159 .blocking_read(path, args)
160 .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
161 }
162
163 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
164 let limiter = self.rate_limiter.clone();
165
166 self.inner
167 .blocking_write(path, args)
168 .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
169 }
170
171 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
172 self.inner.blocking_delete()
173 }
174
175 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
176 self.inner.blocking_list(path, args)
177 }
178}
179
180pub struct ThrottleWrapper<R> {
181 inner: R,
182 limiter: SharedRateLimiter,
183}
184
185impl<R> ThrottleWrapper<R> {
186 pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
187 Self {
188 inner,
189 limiter: rate_limiter,
190 }
191 }
192}
193
194impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
195 async fn read(&mut self) -> Result<Buffer> {
196 self.inner.read().await
197 }
198}
199
200impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> {
201 fn read(&mut self) -> Result<Buffer> {
202 self.inner.read()
203 }
204}
205
206impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
207 async fn write(&mut self, bs: Buffer) -> Result<()> {
208 let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
209
210 loop {
211 match self.limiter.check_n(buf_length) {
212 Ok(res) => match res {
213 Ok(_) => return self.inner.write(bs).await,
214 Err(not_until) => {
216 let _ = not_until.wait_time_from(DefaultClock::default().now());
217 }
222 },
223 Err(_) => return Err(Error::new(
225 ErrorKind::RateLimited,
226 "InsufficientCapacity due to burst size being smaller than the request size",
227 )),
228 }
229 }
230 }
231
232 async fn abort(&mut self) -> Result<()> {
233 self.inner.abort().await
234 }
235
236 async fn close(&mut self) -> Result<Metadata> {
237 self.inner.close().await
238 }
239}
240
241impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
242 fn write(&mut self, bs: Buffer) -> Result<()> {
243 let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
244
245 loop {
246 match self.limiter.check_n(buf_length) {
247 Ok(res) => match res {
248 Ok(_) => return self.inner.write(bs),
249 Err(not_until) => {
251 let wait_time = not_until.wait_time_from(DefaultClock::default().now());
252 thread::sleep(wait_time);
253 }
254 },
255 Err(_) => return Err(Error::new(
257 ErrorKind::RateLimited,
258 "InsufficientCapacity due to burst size being smaller than the request size",
259 )),
260 }
261 }
262 }
263
264 fn close(&mut self) -> Result<Metadata> {
265 self.inner.close()
266 }
267}