opendal/layers/
throttle.rs1use std::num::NonZeroU32;
19use std::sync::Arc;
20
21use governor::Quota;
22use governor::RateLimiter;
23use governor::clock::DefaultClock;
24use governor::middleware::NoOpMiddleware;
25use governor::state::InMemoryState;
26use governor::state::NotKeyed;
27
28use crate::raw::*;
29use crate::*;
30
31#[derive(Clone)]
66pub struct ThrottleLayer {
67 bandwidth: NonZeroU32,
68 burst: NonZeroU32,
69}
70
71impl ThrottleLayer {
72 pub fn new(bandwidth: u32, burst: u32) -> Self {
77 assert!(bandwidth > 0);
78 assert!(burst > 0);
79 Self {
80 bandwidth: NonZeroU32::new(bandwidth).unwrap(),
81 burst: NonZeroU32::new(burst).unwrap(),
82 }
83 }
84}
85
86impl<A: Access> Layer<A> for ThrottleLayer {
87 type LayeredAccess = ThrottleAccessor<A>;
88
89 fn layer(&self, accessor: A) -> Self::LayeredAccess {
90 let rate_limiter = Arc::new(RateLimiter::direct(
91 Quota::per_second(self.bandwidth).allow_burst(self.burst),
92 ));
93 ThrottleAccessor {
94 inner: accessor,
95 rate_limiter,
96 }
97 }
98}
99
100type SharedRateLimiter = Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>;
104
105#[derive(Debug, Clone)]
106pub struct ThrottleAccessor<A: Access> {
107 inner: A,
108 rate_limiter: SharedRateLimiter,
109}
110
111impl<A: Access> LayeredAccess for ThrottleAccessor<A> {
112 type Inner = A;
113 type Reader = ThrottleWrapper<A::Reader>;
114 type Writer = ThrottleWrapper<A::Writer>;
115 type Lister = A::Lister;
116 type Deleter = A::Deleter;
117
118 fn inner(&self) -> &Self::Inner {
119 &self.inner
120 }
121
122 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
123 let limiter = self.rate_limiter.clone();
124
125 self.inner
126 .read(path, args)
127 .await
128 .map(|(rp, r)| (rp, ThrottleWrapper::new(r, limiter)))
129 }
130
131 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
132 let limiter = self.rate_limiter.clone();
133
134 self.inner
135 .write(path, args)
136 .await
137 .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter)))
138 }
139
140 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
141 self.inner.delete().await
142 }
143
144 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
145 self.inner.list(path, args).await
146 }
147}
148
149pub struct ThrottleWrapper<R> {
150 inner: R,
151 limiter: SharedRateLimiter,
152}
153
154impl<R> ThrottleWrapper<R> {
155 pub fn new(inner: R, rate_limiter: SharedRateLimiter) -> Self {
156 Self {
157 inner,
158 limiter: rate_limiter,
159 }
160 }
161}
162
163impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
164 async fn read(&mut self) -> Result<Buffer> {
165 self.inner.read().await
166 }
167}
168
169impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
170 async fn write(&mut self, bs: Buffer) -> Result<()> {
171 let len = bs.len();
172 if len == 0 {
173 return self.inner.write(bs).await;
174 }
175
176 if len > u32::MAX as usize {
177 return Err(Error::new(
178 ErrorKind::RateLimited,
179 "request size exceeds throttle quota capacity",
180 ));
181 }
182
183 let buf_length =
184 NonZeroU32::new(len as u32).expect("len is non-zero so NonZeroU32 must exist");
185
186 self.limiter.until_n_ready(buf_length).await.map_err(|_| {
187 Error::new(
188 ErrorKind::RateLimited,
189 "burst size is smaller than the request size",
190 )
191 })?;
192
193 self.inner.write(bs).await
194 }
195
196 async fn abort(&mut self) -> Result<()> {
197 self.inner.abort().await
198 }
199
200 async fn close(&mut self) -> Result<Metadata> {
201 self.inner.close().await
202 }
203}