opendal/layers/
throttle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::num::NonZeroU32;
19use std::sync::Arc;
20
21use governor::clock::Clock;
22use governor::clock::DefaultClock;
23use governor::middleware::NoOpMiddleware;
24use governor::state::InMemoryState;
25use governor::state::NotKeyed;
26use governor::Quota;
27use governor::RateLimiter;
28
29use crate::raw::*;
30use crate::*;
31
32/// Add a bandwidth rate limiter to the underlying services.
33///
34/// # Throttle
35///
36/// There are several algorithms when it come to rate limiting techniques.
37/// This throttle layer uses Generic Cell Rate Algorithm (GCRA) provided by
38/// [Governor](https://docs.rs/governor/latest/governor/index.html).
39/// By setting the `bandwidth` and `burst`, we can control the byte flow rate of underlying services.
40///
41/// # Note
42///
43/// When setting the ThrottleLayer, always consider the largest possible operation size as the burst size,
44/// as **the burst size should be larger than any possible byte length to allow it to pass through**.
45///
46/// Read more about [Quota](https://docs.rs/governor/latest/governor/struct.Quota.html#examples)
47///
48/// # Examples
49///
50/// This example limits bandwidth to 10 KiB/s and burst size to 10 MiB.
51///
52/// ```no_run
53/// # use opendal::layers::ThrottleLayer;
54/// # use opendal::services;
55/// # use opendal::Operator;
56/// # use opendal::Result;
57/// # use opendal::Scheme;
58///
59/// # fn main() -> Result<()> {
60/// let _ = Operator::new(services::Memory::default())
61///     .expect("must init")
62///     .layer(ThrottleLayer::new(10 * 1024, 10000 * 1024))
63///     .finish();
64/// Ok(())
65/// # }
66/// ```
67#[derive(Clone)]
68pub struct ThrottleLayer {
69    bandwidth: NonZeroU32,
70    burst: NonZeroU32,
71}
72
73impl ThrottleLayer {
74    /// Create a new `ThrottleLayer` with given bandwidth and burst.
75    ///
76    /// - bandwidth: the maximum number of bytes allowed to pass through per second.
77    /// - burst: the maximum number of bytes allowed to pass through at once.
78    pub fn new(bandwidth: u32, burst: u32) -> Self {
79        assert!(bandwidth > 0);
80        assert!(burst > 0);
81        Self {
82            bandwidth: NonZeroU32::new(bandwidth).unwrap(),
83            burst: NonZeroU32::new(burst).unwrap(),
84        }
85    }
86}
87
88impl<A: Access> Layer<A> for ThrottleLayer {
89    type LayeredAccess = ThrottleAccessor<A>;
90
91    fn layer(&self, accessor: A) -> Self::LayeredAccess {
92        let rate_limiter = Arc::new(RateLimiter::direct(
93            Quota::per_second(self.bandwidth).allow_burst(self.burst),
94        ));
95        ThrottleAccessor {
96            inner: accessor,
97            rate_limiter,
98        }
99    }
100}
101
102/// Share an atomic RateLimiter instance across all threads in one operator.
103/// If want to add more observability in the future, replace the default NoOpMiddleware with other middleware types.
104/// Read more about [Middleware](https://docs.rs/governor/latest/governor/middleware/index.html)
105type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
106
107#[derive(Debug, Clone)]
108pub struct ThrottleAccessor<A: Access> {
109    inner: A,
110    rate_limiter: SharedRateLimiter,
111}
112
113impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
114    type Inner = A;
115    type Reader = ThrottleWrapper<A::Reader>;
116    type Writer = ThrottleWrapper<A::Writer>;
117    type Lister = A::Lister;
118    type Deleter = A::Deleter;
119
120    fn inner(&self) -> &Self::Inner {
121        &self.inner
122    }
123
124    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
125        let limiter = self.rate_limiter.clone();
126
127        self.inner
128            .read(path, args)
129            .await
130            .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
131    }
132
133    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
134        let limiter = self.rate_limiter.clone();
135
136        self.inner
137            .write(path, args)
138            .await
139            .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
140    }
141
142    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
143        self.inner.delete().await
144    }
145
146    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
147        self.inner.list(path, args).await
148    }
149}
150
151pub struct ThrottleWrapper<R> {
152    inner: R,
153    limiter: SharedRateLimiter,
154}
155
156impl<R> ThrottleWrapper<R> {
157    pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
158        Self {
159            inner,
160            limiter: rate_limiter,
161        }
162    }
163}
164
165impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
166    async fn read(&mut self) -> Result<Buffer> {
167        self.inner.read().await
168    }
169}
170
171impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
172    async fn write(&mut self, bs: Buffer) -> Result<()> {
173        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
174
175        loop {
176            match self.limiter.check_n(buf_length) {
177                Ok(res) => match res {
178                    Ok(_) => return self.inner.write(bs).await,
179                    // the query is valid but the Decider can not accommodate them.
180                    Err(not_until) => {
181                        let _ = not_until.wait_time_from(DefaultClock::default().now());
182                        // TODO: Should lock the limiter and wait for the wait_time, or should let other small requests go first?
183
184                        // FIXME: we should sleep here.
185                        // tokio::time::sleep(wait_time).await;
186                    }
187                },
188                // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for.
189                Err(_) => return Err(Error::new(
190                    ErrorKind::RateLimited,
191                    "InsufficientCapacity due to burst size being smaller than the request size",
192                )),
193            }
194        }
195    }
196
197    async fn abort(&mut self) -> Result<()> {
198        self.inner.abort().await
199    }
200
201    async fn close(&mut self) -> Result<Metadata> {
202        self.inner.close().await
203    }
204}