1use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21
22use crate::raw::*;
23use crate::*;
24
25#[derive(Clone)]
115pub struct TimeoutLayer {
116 timeout: Duration,
117 io_timeout: Duration,
118}
119
120impl Default for TimeoutLayer {
121 fn default() -> Self {
122 Self {
123 timeout: Duration::from_secs(60),
124 io_timeout: Duration::from_secs(10),
125 }
126 }
127}
128
129impl TimeoutLayer {
130 pub fn new() -> Self {
132 Self::default()
133 }
134
135 pub fn with_timeout(mut self, timeout: Duration) -> Self {
139 self.timeout = timeout;
140 self
141 }
142
143 pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
147 self.io_timeout = timeout;
148 self
149 }
150}
151
152impl<A: Access> Layer<A> for TimeoutLayer {
153 type LayeredAccess = TimeoutAccessor<A>;
154
155 fn layer(&self, inner: A) -> Self::LayeredAccess {
156 let info = inner.info();
157 info.update_executor(|exec| {
158 Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
159 });
160
161 TimeoutAccessor {
162 inner,
163
164 timeout: self.timeout,
165 io_timeout: self.io_timeout,
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
171pub struct TimeoutAccessor<A: Access> {
172 inner: A,
173
174 timeout: Duration,
175 io_timeout: Duration,
176}
177
178impl<A: Access> TimeoutAccessor<A> {
179 async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
180 tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
181 Error::new(ErrorKind::Unexpected, "operation timeout reached")
182 .with_operation(op)
183 .with_context("timeout", self.timeout.as_secs_f64().to_string())
184 .set_temporary()
185 })?
186 }
187
188 async fn io_timeout<F: Future<Output = Result<T>>, T>(
189 &self,
190 op: Operation,
191 fut: F,
192 ) -> Result<T> {
193 tokio::time::timeout(self.io_timeout, fut)
194 .await
195 .map_err(|_| {
196 Error::new(ErrorKind::Unexpected, "io timeout reached")
197 .with_operation(op)
198 .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
199 .set_temporary()
200 })?
201 }
202}
203
204impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
205 type Inner = A;
206 type Reader = TimeoutWrapper<A::Reader>;
207 type Writer = TimeoutWrapper<A::Writer>;
208 type Lister = TimeoutWrapper<A::Lister>;
209 type Deleter = TimeoutWrapper<A::Deleter>;
210
211 fn inner(&self) -> &Self::Inner {
212 &self.inner
213 }
214
215 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
216 self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
217 .await
218 }
219
220 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
221 self.io_timeout(Operation::Read, self.inner.read(path, args))
222 .await
223 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
224 }
225
226 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
227 self.io_timeout(Operation::Write, self.inner.write(path, args))
228 .await
229 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
230 }
231
232 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
233 self.timeout(Operation::Copy, self.inner.copy(from, to, args))
234 .await
235 }
236
237 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
238 self.timeout(Operation::Rename, self.inner.rename(from, to, args))
239 .await
240 }
241
242 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
243 self.timeout(Operation::Stat, self.inner.stat(path, args))
244 .await
245 }
246
247 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
248 self.timeout(Operation::Delete, self.inner.delete())
249 .await
250 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
251 }
252
253 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
254 self.io_timeout(Operation::List, self.inner.list(path, args))
255 .await
256 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
257 }
258
259 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
260 self.timeout(Operation::Presign, self.inner.presign(path, args))
261 .await
262 }
263}
264
265pub struct TimeoutExecutor {
266 exec: Arc<dyn Execute>,
267 timeout: Duration,
268}
269
270impl TimeoutExecutor {
271 pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
272 Self { exec, timeout }
273 }
274}
275
276impl Execute for TimeoutExecutor {
277 fn execute(&self, f: BoxedStaticFuture<()>) {
278 self.exec.execute(f)
279 }
280
281 fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
282 Some(Box::pin(tokio::time::sleep(self.timeout)))
283 }
284}
285
286pub struct TimeoutWrapper<R> {
287 inner: R,
288
289 timeout: Duration,
290}
291
292impl<R> TimeoutWrapper<R> {
293 fn new(inner: R, timeout: Duration) -> Self {
294 Self { inner, timeout }
295 }
296
297 #[inline]
298 async fn io_timeout<F: Future<Output = Result<T>>, T>(
299 timeout: Duration,
300 op: &'static str,
301 fut: F,
302 ) -> Result<T> {
303 tokio::time::timeout(timeout, fut).await.map_err(|_| {
304 Error::new(ErrorKind::Unexpected, "io operation timeout reached")
305 .with_operation(op)
306 .with_context("timeout", timeout.as_secs_f64().to_string())
307 .set_temporary()
308 })?
309 }
310}
311
312impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
313 async fn read(&mut self) -> Result<Buffer> {
314 let fut = self.inner.read();
315 Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
316 }
317}
318
319impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
320 async fn write(&mut self, bs: Buffer) -> Result<()> {
321 let fut = self.inner.write(bs);
322 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
323 }
324
325 async fn close(&mut self) -> Result<Metadata> {
326 let fut = self.inner.close();
327 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
328 }
329
330 async fn abort(&mut self) -> Result<()> {
331 let fut = self.inner.abort();
332 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
333 }
334}
335
336impl<R: oio::List> oio::List for TimeoutWrapper<R> {
337 async fn next(&mut self) -> Result<Option<oio::Entry>> {
338 let fut = self.inner.next();
339 Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
340 }
341}
342
343impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
344 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
345 self.inner.delete(path, args)
346 }
347
348 async fn flush(&mut self) -> Result<usize> {
349 let fut = self.inner.flush();
350 Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use std::future::Future;
357 use std::future::pending;
358 use std::sync::Arc;
359 use std::time::Duration;
360
361 use futures::StreamExt;
362 use tokio::time::sleep;
363 use tokio::time::timeout;
364
365 use crate::layers::TimeoutLayer;
366 use crate::layers::TypeEraseLayer;
367 use crate::raw::*;
368 use crate::*;
369
370 #[derive(Debug, Clone, Default)]
371 struct MockService;
372
373 impl Access for MockService {
374 type Reader = MockReader;
375 type Writer = ();
376 type Lister = MockLister;
377 type Deleter = ();
378
379 fn info(&self) -> Arc<AccessorInfo> {
380 let am = AccessorInfo::default();
381 am.set_native_capability(Capability {
382 read: true,
383 delete: true,
384 ..Default::default()
385 });
386
387 am.into()
388 }
389
390 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
392 Ok((RpRead::new(), MockReader))
393 }
394
395 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
397 sleep(Duration::from_secs(u64::MAX)).await;
398
399 Ok((RpDelete::default(), ()))
400 }
401
402 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
403 Ok((RpList::default(), MockLister))
404 }
405 }
406
407 #[derive(Debug, Clone, Default)]
408 struct MockReader;
409
410 impl oio::Read for MockReader {
411 fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
412 pending()
413 }
414 }
415
416 #[derive(Debug, Clone, Default)]
417 struct MockLister;
418
419 impl oio::List for MockLister {
420 fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
421 pending()
422 }
423 }
424
425 #[tokio::test]
426 async fn test_operation_timeout() {
427 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
428 let op = Operator::from_inner(acc)
429 .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
430
431 let fut = async {
432 let res = op.delete("test").await;
433 assert!(res.is_err());
434 let err = res.unwrap_err();
435 assert_eq!(err.kind(), ErrorKind::Unexpected);
436 assert!(err.to_string().contains("timeout"))
437 };
438
439 timeout(Duration::from_secs(2), fut)
440 .await
441 .expect("this test should not exceed 2 seconds")
442 }
443
444 #[tokio::test]
445 async fn test_io_timeout() {
446 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
447 let op = Operator::from_inner(acc)
448 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
449
450 let reader = op.reader("test").await.unwrap();
451
452 let res = reader.read(0..4).await;
453 assert!(res.is_err());
454 let err = res.unwrap_err();
455 assert_eq!(err.kind(), ErrorKind::Unexpected);
456 assert!(err.to_string().contains("timeout"))
457 }
458
459 #[tokio::test]
460 async fn test_list_timeout() {
461 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
462 let op = Operator::from_inner(acc).layer(
463 TimeoutLayer::new()
464 .with_timeout(Duration::from_secs(1))
465 .with_io_timeout(Duration::from_secs(1)),
466 );
467
468 let mut lister = op.lister("test").await.unwrap();
469
470 let res = lister.next().await.unwrap();
471 assert!(res.is_err());
472 let err = res.unwrap_err();
473 assert_eq!(err.kind(), ErrorKind::Unexpected);
474 assert!(err.to_string().contains("timeout"))
475 }
476
477 #[tokio::test]
478 async fn test_list_timeout_raw() {
479 use oio::List;
480
481 let acc = MockService;
482 let timeout_layer = TimeoutLayer::new()
483 .with_timeout(Duration::from_secs(1))
484 .with_io_timeout(Duration::from_secs(1));
485 let timeout_acc = timeout_layer.layer(acc);
486
487 let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
488 .await
489 .unwrap();
490
491 let res = lister.next().await;
492 assert!(res.is_err());
493 let err = res.unwrap_err();
494 assert_eq!(err.kind(), ErrorKind::Unexpected);
495 assert!(err.to_string().contains("timeout"));
496 }
497}