opendal/layers/
throttle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::num::NonZeroU32;
19use std::sync::Arc;
20
21use governor::Quota;
22use governor::RateLimiter;
23use governor::clock::DefaultClock;
24use governor::middleware::NoOpMiddleware;
25use governor::state::InMemoryState;
26use governor::state::NotKeyed;
27
28use crate::raw::*;
29use crate::*;
30
31/// Add a bandwidth rate limiter to the underlying services.
32///
33/// # Throttle
34///
35/// There are several algorithms when it come to rate limiting techniques.
36/// This throttle layer uses Generic Cell Rate Algorithm (GCRA) provided by
37/// [Governor](https://docs.rs/governor/latest/governor/index.html).
38/// By setting the `bandwidth` and `burst`, we can control the byte flow rate of underlying services.
39///
40/// # Note
41///
42/// When setting the ThrottleLayer, always consider the largest possible operation size as the burst size,
43/// as **the burst size should be larger than any possible byte length to allow it to pass through**.
44///
45/// Read more about [Quota](https://docs.rs/governor/latest/governor/struct.Quota.html#examples)
46///
47/// # Examples
48///
49/// This example limits bandwidth to 10 KiB/s and burst size to 10 MiB.
50///
51/// ```no_run
52/// # use opendal::layers::ThrottleLayer;
53/// # use opendal::services;
54/// # use opendal::Operator;
55/// # use opendal::Result;
56///
57/// # fn main() -> Result<()> {
58/// let _ = Operator::new(services::Memory::default())
59///     .expect("must init")
60///     .layer(ThrottleLayer::new(10 * 1024, 10000 * 1024))
61///     .finish();
62/// Ok(())
63/// # }
64/// ```
65#[derive(Clone)]
66pub struct ThrottleLayer {
67    bandwidth: NonZeroU32,
68    burst: NonZeroU32,
69}
70
71impl ThrottleLayer {
72    /// Create a new `ThrottleLayer` with given bandwidth and burst.
73    ///
74    /// - bandwidth: the maximum number of bytes allowed to pass through per second.
75    /// - burst: the maximum number of bytes allowed to pass through at once.
76    pub fn new(bandwidth: u32, burst: u32) -> Self {
77        assert!(bandwidth > 0);
78        assert!(burst > 0);
79        Self {
80            bandwidth: NonZeroU32::new(bandwidth).unwrap(),
81            burst: NonZeroU32::new(burst).unwrap(),
82        }
83    }
84}
85
86impl<A: Access> Layer<A> for ThrottleLayer {
87    type LayeredAccess = ThrottleAccessor<A>;
88
89    fn layer(&self, accessor: A) -> Self::LayeredAccess {
90        let rate_limiter = Arc::new(RateLimiter::direct(
91            Quota::per_second(self.bandwidth).allow_burst(self.burst),
92        ));
93        ThrottleAccessor {
94            inner: accessor,
95            rate_limiter,
96        }
97    }
98}
99
100/// Share an atomic RateLimiter instance across all threads in one operator.
101/// If want to add more observability in the future, replace the default NoOpMiddleware with other middleware types.
102/// Read more about [Middleware](https://docs.rs/governor/latest/governor/middleware/index.html)
103type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
104
105#[derive(Debug, Clone)]
106pub struct ThrottleAccessor<A: Access> {
107    inner: A,
108    rate_limiter: SharedRateLimiter,
109}
110
111impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
112    type Inner = A;
113    type Reader = ThrottleWrapper<A::Reader>;
114    type Writer = ThrottleWrapper<A::Writer>;
115    type Lister = A::Lister;
116    type Deleter = A::Deleter;
117
118    fn inner(&self) -> &Self::Inner {
119        &self.inner
120    }
121
122    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
123        let limiter = self.rate_limiter.clone();
124
125        self.inner
126            .read(path, args)
127            .await
128            .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
129    }
130
131    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
132        let limiter = self.rate_limiter.clone();
133
134        self.inner
135            .write(path, args)
136            .await
137            .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
138    }
139
140    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
141        self.inner.delete().await
142    }
143
144    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
145        self.inner.list(path, args).await
146    }
147}
148
149pub struct ThrottleWrapper<R> {
150    inner: R,
151    limiter: SharedRateLimiter,
152}
153
154impl<R> ThrottleWrapper<R> {
155    pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
156        Self {
157            inner,
158            limiter: rate_limiter,
159        }
160    }
161}
162
163impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
164    async fn read(&mut self) -> Result<Buffer> {
165        self.inner.read().await
166    }
167}
168
169impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
170    async fn write(&mut self, bs: Buffer) -> Result<()> {
171        let len = bs.len();
172        if len == 0 {
173            return self.inner.write(bs).await;
174        }
175
176        if len > u32::MAX as usize {
177            return Err(Error::new(
178                ErrorKind::RateLimited,
179                "request size exceeds throttle quota capacity",
180            ));
181        }
182
183        let buf_length =
184            NonZeroU32::new(len as u32).expect("len is non-zero so NonZeroU32 must exist");
185
186        self.limiter.until_n_ready(buf_length).await.map_err(|_| {
187            Error::new(
188                ErrorKind::RateLimited,
189                "burst size is smaller than the request size",
190            )
191        })?;
192
193        self.inner.write(bs).await
194    }
195
196    async fn abort(&mut self) -> Result<()> {
197        self.inner.abort().await
198    }
199
200    async fn close(&mut self) -> Result<Metadata> {
201        self.inner.close().await
202    }
203}