opendal/layers/throttle.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::num::NonZeroU32;
19use std::sync::Arc;
20
21use governor::clock::Clock;
22use governor::clock::DefaultClock;
23use governor::middleware::NoOpMiddleware;
24use governor::state::InMemoryState;
25use governor::state::NotKeyed;
26use governor::Quota;
27use governor::RateLimiter;
28
29use crate::raw::*;
30use crate::*;
31
32/// Add a bandwidth rate limiter to the underlying services.
33///
34/// # Throttle
35///
36/// There are several algorithms when it come to rate limiting techniques.
37/// This throttle layer uses Generic Cell Rate Algorithm (GCRA) provided by
38/// [Governor](https://docs.rs/governor/latest/governor/index.html).
39/// By setting the `bandwidth` and `burst`, we can control the byte flow rate of underlying services.
40///
41/// # Note
42///
43/// When setting the ThrottleLayer, always consider the largest possible operation size as the burst size,
44/// as **the burst size should be larger than any possible byte length to allow it to pass through**.
45///
46/// Read more about [Quota](https://docs.rs/governor/latest/governor/struct.Quota.html#examples)
47///
48/// # Examples
49///
50/// This example limits bandwidth to 10 KiB/s and burst size to 10 MiB.
51///
52/// ```no_run
53/// # use opendal::layers::ThrottleLayer;
54/// # use opendal::services;
55/// # use opendal::Operator;
56/// # use opendal::Result;
57/// # use opendal::Scheme;
58///
59/// # fn main() -> Result<()> {
60/// let _ = Operator::new(services::Memory::default())
61/// .expect("must init")
62/// .layer(ThrottleLayer::new(10 * 1024, 10000 * 1024))
63/// .finish();
64/// Ok(())
65/// # }
66/// ```
67#[derive(Clone)]
68pub struct ThrottleLayer {
69 bandwidth: NonZeroU32,
70 burst: NonZeroU32,
71}
72
73impl ThrottleLayer {
74 /// Create a new `ThrottleLayer` with given bandwidth and burst.
75 ///
76 /// - bandwidth: the maximum number of bytes allowed to pass through per second.
77 /// - burst: the maximum number of bytes allowed to pass through at once.
78 pub fn new(bandwidth: u32, burst: u32) -> Self {
79 assert!(bandwidth > 0);
80 assert!(burst > 0);
81 Self {
82 bandwidth: NonZeroU32::new(bandwidth).unwrap(),
83 burst: NonZeroU32::new(burst).unwrap(),
84 }
85 }
86}
87
88impl<A: Access> Layer<A> for ThrottleLayer {
89 type LayeredAccess = ThrottleAccessor<A>;
90
91 fn layer(&self, accessor: A) -> Self::LayeredAccess {
92 let rate_limiter = Arc::new(RateLimiter::direct(
93 Quota::per_second(self.bandwidth).allow_burst(self.burst),
94 ));
95 ThrottleAccessor {
96 inner: accessor,
97 rate_limiter,
98 }
99 }
100}
101
102/// Share an atomic RateLimiter instance across all threads in one operator.
103/// If want to add more observability in the future, replace the default NoOpMiddleware with other middleware types.
104/// Read more about [Middleware](https://docs.rs/governor/latest/governor/middleware/index.html)
105type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
106
107#[derive(Debug, Clone)]
108pub struct ThrottleAccessor<A: Access> {
109 inner: A,
110 rate_limiter: SharedRateLimiter,
111}
112
113impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
114 type Inner = A;
115 type Reader = ThrottleWrapper<A::Reader>;
116 type Writer = ThrottleWrapper<A::Writer>;
117 type Lister = A::Lister;
118 type Deleter = A::Deleter;
119
120 fn inner(&self) -> &Self::Inner {
121 &self.inner
122 }
123
124 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
125 let limiter = self.rate_limiter.clone();
126
127 self.inner
128 .read(path, args)
129 .await
130 .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
131 }
132
133 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
134 let limiter = self.rate_limiter.clone();
135
136 self.inner
137 .write(path, args)
138 .await
139 .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
140 }
141
142 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
143 self.inner.delete().await
144 }
145
146 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
147 self.inner.list(path, args).await
148 }
149}
150
151pub struct ThrottleWrapper<R> {
152 inner: R,
153 limiter: SharedRateLimiter,
154}
155
156impl<R> ThrottleWrapper<R> {
157 pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
158 Self {
159 inner,
160 limiter: rate_limiter,
161 }
162 }
163}
164
165impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
166 async fn read(&mut self) -> Result<Buffer> {
167 self.inner.read().await
168 }
169}
170
171impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
172 async fn write(&mut self, bs: Buffer) -> Result<()> {
173 let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
174
175 loop {
176 match self.limiter.check_n(buf_length) {
177 Ok(res) => match res {
178 Ok(_) => return self.inner.write(bs).await,
179 // the query is valid but the Decider can not accommodate them.
180 Err(not_until) => {
181 let _ = not_until.wait_time_from(DefaultClock::default().now());
182 // TODO: Should lock the limiter and wait for the wait_time, or should let other small requests go first?
183
184 // FIXME: we should sleep here.
185 // tokio::time::sleep(wait_time).await;
186 }
187 },
188 // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for.
189 Err(_) => return Err(Error::new(
190 ErrorKind::RateLimited,
191 "InsufficientCapacity due to burst size being smaller than the request size",
192 )),
193 }
194 }
195 }
196
197 async fn abort(&mut self) -> Result<()> {
198 self.inner.abort().await
199 }
200
201 async fn close(&mut self) -> Result<Metadata> {
202 self.inner.close().await
203 }
204}