1use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21
22use crate::raw::*;
23use crate::*;
24
25#[derive(Clone)]
116pub struct TimeoutLayer {
117 timeout: Duration,
118 io_timeout: Duration,
119}
120
121impl Default for TimeoutLayer {
122 fn default() -> Self {
123 Self {
124 timeout: Duration::from_secs(60),
125 io_timeout: Duration::from_secs(10),
126 }
127 }
128}
129
130impl TimeoutLayer {
131 pub fn new() -> Self {
133 Self::default()
134 }
135
136 pub fn with_timeout(mut self, timeout: Duration) -> Self {
140 self.timeout = timeout;
141 self
142 }
143
144 pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
148 self.io_timeout = timeout;
149 self
150 }
151
152 #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")]
163 pub fn with_speed(self, _: u64) -> Self {
164 self
165 }
166}
167
168impl<A: Access> Layer<A> for TimeoutLayer {
169 type LayeredAccess = TimeoutAccessor<A>;
170
171 fn layer(&self, inner: A) -> Self::LayeredAccess {
172 let info = inner.info();
173 info.update_executor(|exec| {
174 Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
175 });
176
177 TimeoutAccessor {
178 inner,
179
180 timeout: self.timeout,
181 io_timeout: self.io_timeout,
182 }
183 }
184}
185
186#[derive(Debug, Clone)]
187pub struct TimeoutAccessor<A: Access> {
188 inner: A,
189
190 timeout: Duration,
191 io_timeout: Duration,
192}
193
194impl<A: Access> TimeoutAccessor<A> {
195 async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
196 tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
197 Error::new(ErrorKind::Unexpected, "operation timeout reached")
198 .with_operation(op)
199 .with_context("timeout", self.timeout.as_secs_f64().to_string())
200 .set_temporary()
201 })?
202 }
203
204 async fn io_timeout<F: Future<Output = Result<T>>, T>(
205 &self,
206 op: Operation,
207 fut: F,
208 ) -> Result<T> {
209 tokio::time::timeout(self.io_timeout, fut)
210 .await
211 .map_err(|_| {
212 Error::new(ErrorKind::Unexpected, "io timeout reached")
213 .with_operation(op)
214 .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
215 .set_temporary()
216 })?
217 }
218}
219
220impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
221 type Inner = A;
222 type Reader = TimeoutWrapper<A::Reader>;
223 type BlockingReader = A::BlockingReader;
224 type Writer = TimeoutWrapper<A::Writer>;
225 type BlockingWriter = A::BlockingWriter;
226 type Lister = TimeoutWrapper<A::Lister>;
227 type BlockingLister = A::BlockingLister;
228 type Deleter = TimeoutWrapper<A::Deleter>;
229 type BlockingDeleter = A::BlockingDeleter;
230
231 fn inner(&self) -> &Self::Inner {
232 &self.inner
233 }
234
235 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
236 self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
237 .await
238 }
239
240 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
241 self.io_timeout(Operation::Read, self.inner.read(path, args))
242 .await
243 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
244 }
245
246 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
247 self.io_timeout(Operation::Write, self.inner.write(path, args))
248 .await
249 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
250 }
251
252 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
253 self.timeout(Operation::Copy, self.inner.copy(from, to, args))
254 .await
255 }
256
257 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
258 self.timeout(Operation::Rename, self.inner.rename(from, to, args))
259 .await
260 }
261
262 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
263 self.timeout(Operation::Stat, self.inner.stat(path, args))
264 .await
265 }
266
267 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
268 self.timeout(Operation::Delete, self.inner.delete())
269 .await
270 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
271 }
272
273 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
274 self.io_timeout(Operation::List, self.inner.list(path, args))
275 .await
276 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
277 }
278
279 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
280 self.timeout(Operation::Presign, self.inner.presign(path, args))
281 .await
282 }
283
284 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
285 self.inner.blocking_read(path, args)
286 }
287
288 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
289 self.inner.blocking_write(path, args)
290 }
291
292 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
293 self.inner.blocking_list(path, args)
294 }
295
296 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
297 self.inner.blocking_delete()
298 }
299}
300
301pub struct TimeoutExecutor {
302 exec: Arc<dyn Execute>,
303 timeout: Duration,
304}
305
306impl TimeoutExecutor {
307 pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
308 Self { exec, timeout }
309 }
310}
311
312impl Execute for TimeoutExecutor {
313 fn execute(&self, f: BoxedStaticFuture<()>) {
314 self.exec.execute(f)
315 }
316
317 fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
318 Some(Box::pin(tokio::time::sleep(self.timeout)))
319 }
320}
321
322pub struct TimeoutWrapper<R> {
323 inner: R,
324
325 timeout: Duration,
326}
327
328impl<R> TimeoutWrapper<R> {
329 fn new(inner: R, timeout: Duration) -> Self {
330 Self { inner, timeout }
331 }
332
333 #[inline]
334 async fn io_timeout<F: Future<Output = Result<T>>, T>(
335 timeout: Duration,
336 op: &'static str,
337 fut: F,
338 ) -> Result<T> {
339 tokio::time::timeout(timeout, fut).await.map_err(|_| {
340 Error::new(ErrorKind::Unexpected, "io operation timeout reached")
341 .with_operation(op)
342 .with_context("timeout", timeout.as_secs_f64().to_string())
343 .set_temporary()
344 })?
345 }
346}
347
348impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
349 async fn read(&mut self) -> Result<Buffer> {
350 let fut = self.inner.read();
351 Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
352 }
353}
354
355impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
356 async fn write(&mut self, bs: Buffer) -> Result<()> {
357 let fut = self.inner.write(bs);
358 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
359 }
360
361 async fn close(&mut self) -> Result<Metadata> {
362 let fut = self.inner.close();
363 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
364 }
365
366 async fn abort(&mut self) -> Result<()> {
367 let fut = self.inner.abort();
368 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
369 }
370}
371
372impl<R: oio::List> oio::List for TimeoutWrapper<R> {
373 async fn next(&mut self) -> Result<Option<oio::Entry>> {
374 let fut = self.inner.next();
375 Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
376 }
377}
378
379impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
380 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
381 self.inner.delete(path, args)
382 }
383
384 async fn flush(&mut self) -> Result<usize> {
385 let fut = self.inner.flush();
386 Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use std::future::pending;
393 use std::future::Future;
394 use std::sync::Arc;
395 use std::time::Duration;
396
397 use futures::StreamExt;
398 use tokio::time::sleep;
399 use tokio::time::timeout;
400
401 use crate::layers::TimeoutLayer;
402 use crate::layers::TypeEraseLayer;
403 use crate::raw::*;
404 use crate::*;
405
406 #[derive(Debug, Clone, Default)]
407 struct MockService;
408
409 impl Access for MockService {
410 type Reader = MockReader;
411 type Writer = ();
412 type Lister = MockLister;
413 type BlockingReader = ();
414 type BlockingWriter = ();
415 type BlockingLister = ();
416 type Deleter = ();
417 type BlockingDeleter = ();
418
419 fn info(&self) -> Arc<AccessorInfo> {
420 let am = AccessorInfo::default();
421 am.set_native_capability(Capability {
422 read: true,
423 delete: true,
424 ..Default::default()
425 });
426
427 am.into()
428 }
429
430 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
432 Ok((RpRead::new(), MockReader))
433 }
434
435 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
437 sleep(Duration::from_secs(u64::MAX)).await;
438
439 Ok((RpDelete::default(), ()))
440 }
441
442 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
443 Ok((RpList::default(), MockLister))
444 }
445 }
446
447 #[derive(Debug, Clone, Default)]
448 struct MockReader;
449
450 impl oio::Read for MockReader {
451 fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
452 pending()
453 }
454 }
455
456 #[derive(Debug, Clone, Default)]
457 struct MockLister;
458
459 impl oio::List for MockLister {
460 fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
461 pending()
462 }
463 }
464
465 #[tokio::test]
466 async fn test_operation_timeout() {
467 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
468 let op = Operator::from_inner(acc)
469 .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
470
471 let fut = async {
472 let res = op.delete("test").await;
473 assert!(res.is_err());
474 let err = res.unwrap_err();
475 assert_eq!(err.kind(), ErrorKind::Unexpected);
476 assert!(err.to_string().contains("timeout"))
477 };
478
479 timeout(Duration::from_secs(2), fut)
480 .await
481 .expect("this test should not exceed 2 seconds")
482 }
483
484 #[tokio::test]
485 async fn test_io_timeout() {
486 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
487 let op = Operator::from_inner(acc)
488 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
489
490 let reader = op.reader("test").await.unwrap();
491
492 let res = reader.read(0..4).await;
493 assert!(res.is_err());
494 let err = res.unwrap_err();
495 assert_eq!(err.kind(), ErrorKind::Unexpected);
496 assert!(err.to_string().contains("timeout"))
497 }
498
499 #[tokio::test]
500 async fn test_list_timeout() {
501 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
502 let op = Operator::from_inner(acc).layer(
503 TimeoutLayer::new()
504 .with_timeout(Duration::from_secs(1))
505 .with_io_timeout(Duration::from_secs(1)),
506 );
507
508 let mut lister = op.lister("test").await.unwrap();
509
510 let res = lister.next().await.unwrap();
511 assert!(res.is_err());
512 let err = res.unwrap_err();
513 assert_eq!(err.kind(), ErrorKind::Unexpected);
514 assert!(err.to_string().contains("timeout"))
515 }
516
517 #[tokio::test]
518 async fn test_list_timeout_raw() {
519 use oio::List;
520
521 let acc = MockService;
522 let timeout_layer = TimeoutLayer::new()
523 .with_timeout(Duration::from_secs(1))
524 .with_io_timeout(Duration::from_secs(1));
525 let timeout_acc = timeout_layer.layer(acc);
526
527 let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
528 .await
529 .unwrap();
530
531 let res = lister.next().await;
532 assert!(res.is_err());
533 let err = res.unwrap_err();
534 assert_eq!(err.kind(), ErrorKind::Unexpected);
535 assert!(err.to_string().contains("timeout"));
536 }
537}