opendal/layers/
throttle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::num::NonZeroU32;
19use std::sync::Arc;
20use std::thread;
21
22use governor::clock::Clock;
23use governor::clock::DefaultClock;
24use governor::middleware::NoOpMiddleware;
25use governor::state::InMemoryState;
26use governor::state::NotKeyed;
27use governor::Quota;
28use governor::RateLimiter;
29
30use crate::raw::*;
31use crate::*;
32
33/// Add a bandwidth rate limiter to the underlying services.
34///
35/// # Throttle
36///
37/// There are several algorithms when it come to rate limiting techniques.
38/// This throttle layer uses Generic Cell Rate Algorithm (GCRA) provided by
39/// [Governor](https://docs.rs/governor/latest/governor/index.html).
40/// By setting the `bandwidth` and `burst`, we can control the byte flow rate of underlying services.
41///
42/// # Note
43///
44/// When setting the ThrottleLayer, always consider the largest possible operation size as the burst size,
45/// as **the burst size should be larger than any possible byte length to allow it to pass through**.
46///
47/// Read more about [Quota](https://docs.rs/governor/latest/governor/struct.Quota.html#examples)
48///
49/// # Examples
50///
51/// This example limits bandwidth to 10 KiB/s and burst size to 10 MiB.
52///
53/// ```no_run
54/// # use opendal::layers::ThrottleLayer;
55/// # use opendal::services;
56/// # use opendal::Operator;
57/// # use opendal::Result;
58/// # use opendal::Scheme;
59///
60/// # fn main() -> Result<()> {
61/// let _ = Operator::new(services::Memory::default())
62///     .expect("must init")
63///     .layer(ThrottleLayer::new(10 * 1024, 10000 * 1024))
64///     .finish();
65/// Ok(())
66/// # }
67/// ```
68#[derive(Clone)]
69pub struct ThrottleLayer {
70    bandwidth: NonZeroU32,
71    burst: NonZeroU32,
72}
73
74impl ThrottleLayer {
75    /// Create a new `ThrottleLayer` with given bandwidth and burst.
76    ///
77    /// - bandwidth: the maximum number of bytes allowed to pass through per second.
78    /// - burst: the maximum number of bytes allowed to pass through at once.
79    pub fn new(bandwidth: u32, burst: u32) -> Self {
80        assert!(bandwidth > 0);
81        assert!(burst > 0);
82        Self {
83            bandwidth: NonZeroU32::new(bandwidth).unwrap(),
84            burst: NonZeroU32::new(burst).unwrap(),
85        }
86    }
87}
88
89impl<A: Access> Layer<A> for ThrottleLayer {
90    type LayeredAccess = ThrottleAccessor<A>;
91
92    fn layer(&self, accessor: A) -> Self::LayeredAccess {
93        let rate_limiter = Arc::new(RateLimiter::direct(
94            Quota::per_second(self.bandwidth).allow_burst(self.burst),
95        ));
96        ThrottleAccessor {
97            inner: accessor,
98            rate_limiter,
99        }
100    }
101}
102
103/// Share an atomic RateLimiter instance across all threads in one operator.
104/// If want to add more observability in the future, replace the default NoOpMiddleware with other middleware types.
105/// Read more about [Middleware](https://docs.rs/governor/latest/governor/middleware/index.html)
106type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
107
108#[derive(Debug, Clone)]
109pub struct ThrottleAccessor<A: Access> {
110    inner: A,
111    rate_limiter: SharedRateLimiter,
112}
113
114impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
115    type Inner = A;
116    type Reader = ThrottleWrapper<A::Reader>;
117    type Writer = ThrottleWrapper<A::Writer>;
118    type Lister = A::Lister;
119    type Deleter = A::Deleter;
120    type BlockingReader = ThrottleWrapper<A::BlockingReader>;
121    type BlockingWriter = ThrottleWrapper<A::BlockingWriter>;
122    type BlockingLister = A::BlockingLister;
123    type BlockingDeleter = A::BlockingDeleter;
124
125    fn inner(&self) -> &Self::Inner {
126        &self.inner
127    }
128
129    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
130        let limiter = self.rate_limiter.clone();
131
132        self.inner
133            .read(path, args)
134            .await
135            .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
136    }
137
138    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
139        let limiter = self.rate_limiter.clone();
140
141        self.inner
142            .write(path, args)
143            .await
144            .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
145    }
146
147    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
148        self.inner.delete().await
149    }
150
151    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
152        self.inner.list(path, args).await
153    }
154
155    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
156        let limiter = self.rate_limiter.clone();
157
158        self.inner
159            .blocking_read(path, args)
160            .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
161    }
162
163    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
164        let limiter = self.rate_limiter.clone();
165
166        self.inner
167            .blocking_write(path, args)
168            .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
169    }
170
171    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
172        self.inner.blocking_delete()
173    }
174
175    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
176        self.inner.blocking_list(path, args)
177    }
178}
179
180pub struct ThrottleWrapper<R> {
181    inner: R,
182    limiter: SharedRateLimiter,
183}
184
185impl<R> ThrottleWrapper<R> {
186    pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
187        Self {
188            inner,
189            limiter: rate_limiter,
190        }
191    }
192}
193
194impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
195    async fn read(&mut self) -> Result<Buffer> {
196        self.inner.read().await
197    }
198}
199
200impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> {
201    fn read(&mut self) -> Result<Buffer> {
202        self.inner.read()
203    }
204}
205
206impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
207    async fn write(&mut self, bs: Buffer) -> Result<()> {
208        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
209
210        loop {
211            match self.limiter.check_n(buf_length) {
212                Ok(res) => match res {
213                    Ok(_) => return self.inner.write(bs).await,
214                    // the query is valid but the Decider can not accommodate them.
215                    Err(not_until) => {
216                        let _ = not_until.wait_time_from(DefaultClock::default().now());
217                        // TODO: Should lock the limiter and wait for the wait_time, or should let other small requests go first?
218
219                        // FIXME: we should sleep here.
220                        // tokio::time::sleep(wait_time).await;
221                    }
222                },
223                // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for.
224                Err(_) => return Err(Error::new(
225                    ErrorKind::RateLimited,
226                    "InsufficientCapacity due to burst size being smaller than the request size",
227                )),
228            }
229        }
230    }
231
232    async fn abort(&mut self) -> Result<()> {
233        self.inner.abort().await
234    }
235
236    async fn close(&mut self) -> Result<Metadata> {
237        self.inner.close().await
238    }
239}
240
241impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
242    fn write(&mut self, bs: Buffer) -> Result<()> {
243        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
244
245        loop {
246            match self.limiter.check_n(buf_length) {
247                Ok(res) => match res {
248                    Ok(_) => return self.inner.write(bs),
249                    // the query is valid but the Decider can not accommodate them.
250                    Err(not_until) => {
251                        let wait_time = not_until.wait_time_from(DefaultClock::default().now());
252                        thread::sleep(wait_time);
253                    }
254                },
255                // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for.
256                Err(_) => return Err(Error::new(
257                    ErrorKind::RateLimited,
258                    "InsufficientCapacity due to burst size being smaller than the request size",
259                )),
260            }
261        }
262    }
263
264    fn close(&mut self) -> Result<Metadata> {
265        self.inner.close()
266    }
267}