opendal/layers/
timeout.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21
22use crate::raw::*;
23use crate::*;
24
25/// Add timeout for every operation to avoid slow or unexpected hang operations.
26///
27/// For example, a dead connection could hang a databases sql query. TimeoutLayer
28/// will break this connection and returns an error so users can handle it by
29/// retrying or print to users.
30///
31/// # Notes
32///
33/// `TimeoutLayer` treats all operations in two kinds:
34///
35/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control
36///   them by setting `timeout`.
37/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we
38///   control them by setting `io_timeout`.
39///
40/// # Default
41///
42/// - timeout: 60 seconds
43/// - io_timeout: 10 seconds
44///
45/// # Panics
46///
47/// TimeoutLayer will drop the future if the timeout is reached. This might cause the internal state
48/// of the future to be broken. If underlying future moves ownership into the future, it will be
49/// dropped and will neven return back.
50///
51/// For example, while using `TimeoutLayer` with `RetryLayer` at the same time, please make sure
52/// timeout layer showed up before retry layer.
53///
54/// ```no_run
55/// # use std::time::Duration;
56///
57/// # use opendal::layers::RetryLayer;
58/// # use opendal::layers::TimeoutLayer;
59/// # use opendal::services;
60/// # use opendal::Operator;
61/// # use opendal::Result;
62///
63/// # fn main() -> Result<()> {
64/// let op = Operator::new(services::Memory::default())?
65///     // This is fine, since timeout happen during retry.
66///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
67///     .layer(RetryLayer::new())
68///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
69///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
70///     .finish();
71/// Ok(())
72/// # }
73/// ```
74///
75/// # Examples
76///
77/// The following examples will create a timeout layer with 10 seconds timeout for all non-io
78/// operations, 3 seconds timeout for all io operations.
79///
80/// ```no_run
81/// # use std::time::Duration;
82///
83/// # use opendal::layers::TimeoutLayer;
84/// # use opendal::services;
85/// # use opendal::Operator;
86/// # use opendal::Result;
87/// # use opendal::Scheme;
88///
89/// # fn main() -> Result<()> {
90/// let _ = Operator::new(services::Memory::default())?
91///     .layer(
92///         TimeoutLayer::default()
93///             .with_timeout(Duration::from_secs(10))
94///             .with_io_timeout(Duration::from_secs(3)),
95///     )
96///     .finish();
97/// Ok(())
98/// # }
99/// ```
100///
101/// # Implementation Notes
102///
103/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO
104/// Operations insides `reader`, `writer` will use `Pin<Box<tokio::time::Sleep>>` to track the
105/// timeout.
106///
107/// This might introduce a bit overhead for IO operations, but it's the only way to implement
108/// timeout correctly. We used to implement timeout layer in zero cost way that only stores
109/// a [`std::time::Instant`] and check the timeout by comparing the instant with current time.
110/// However, it doesn't work for all cases.
111///
112/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime
113/// will never poll our future again. From the application side, this future is hanging forever
114/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times.
115#[derive(Clone)]
116pub struct TimeoutLayer {
117    timeout: Duration,
118    io_timeout: Duration,
119}
120
121impl Default for TimeoutLayer {
122    fn default() -> Self {
123        Self {
124            timeout: Duration::from_secs(60),
125            io_timeout: Duration::from_secs(10),
126        }
127    }
128}
129
130impl TimeoutLayer {
131    /// Create a new `TimeoutLayer` with default settings.
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    /// Set timeout for TimeoutLayer with given value.
137    ///
138    /// This timeout is for all non-io operations like `stat`, `delete`.
139    pub fn with_timeout(mut self, timeout: Duration) -> Self {
140        self.timeout = timeout;
141        self
142    }
143
144    /// Set io timeout for TimeoutLayer with given value.
145    ///
146    /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`.
147    pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
148        self.io_timeout = timeout;
149        self
150    }
151
152    /// Set speed for TimeoutLayer with given value.
153    ///
154    /// # Notes
155    ///
156    /// The speed should be the lower bound of the IO speed. Set this value too
157    /// large could result in all write operations failing.
158    ///
159    /// # Panics
160    ///
161    /// This function will panic if speed is 0.
162    #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")]
163    pub fn with_speed(self, _: u64) -> Self {
164        self
165    }
166}
167
168impl<A: Access> Layer<A> for TimeoutLayer {
169    type LayeredAccess = TimeoutAccessor<A>;
170
171    fn layer(&self, inner: A) -> Self::LayeredAccess {
172        let info = inner.info();
173        info.update_executor(|exec| {
174            Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
175        });
176
177        TimeoutAccessor {
178            inner,
179
180            timeout: self.timeout,
181            io_timeout: self.io_timeout,
182        }
183    }
184}
185
186#[derive(Debug, Clone)]
187pub struct TimeoutAccessor<A: Access> {
188    inner: A,
189
190    timeout: Duration,
191    io_timeout: Duration,
192}
193
194impl<A: Access> TimeoutAccessor<A> {
195    async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
196        tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
197            Error::new(ErrorKind::Unexpected, "operation timeout reached")
198                .with_operation(op)
199                .with_context("timeout", self.timeout.as_secs_f64().to_string())
200                .set_temporary()
201        })?
202    }
203
204    async fn io_timeout<F: Future<Output = Result<T>>, T>(
205        &self,
206        op: Operation,
207        fut: F,
208    ) -> Result<T> {
209        tokio::time::timeout(self.io_timeout, fut)
210            .await
211            .map_err(|_| {
212                Error::new(ErrorKind::Unexpected, "io timeout reached")
213                    .with_operation(op)
214                    .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
215                    .set_temporary()
216            })?
217    }
218}
219
220impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
221    type Inner = A;
222    type Reader = TimeoutWrapper<A::Reader>;
223    type BlockingReader = A::BlockingReader;
224    type Writer = TimeoutWrapper<A::Writer>;
225    type BlockingWriter = A::BlockingWriter;
226    type Lister = TimeoutWrapper<A::Lister>;
227    type BlockingLister = A::BlockingLister;
228    type Deleter = TimeoutWrapper<A::Deleter>;
229    type BlockingDeleter = A::BlockingDeleter;
230
231    fn inner(&self) -> &Self::Inner {
232        &self.inner
233    }
234
235    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
236        self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
237            .await
238    }
239
240    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
241        self.io_timeout(Operation::Read, self.inner.read(path, args))
242            .await
243            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
244    }
245
246    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
247        self.io_timeout(Operation::Write, self.inner.write(path, args))
248            .await
249            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
250    }
251
252    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
253        self.timeout(Operation::Copy, self.inner.copy(from, to, args))
254            .await
255    }
256
257    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
258        self.timeout(Operation::Rename, self.inner.rename(from, to, args))
259            .await
260    }
261
262    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
263        self.timeout(Operation::Stat, self.inner.stat(path, args))
264            .await
265    }
266
267    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
268        self.timeout(Operation::Delete, self.inner.delete())
269            .await
270            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
271    }
272
273    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
274        self.io_timeout(Operation::List, self.inner.list(path, args))
275            .await
276            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
277    }
278
279    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
280        self.timeout(Operation::Presign, self.inner.presign(path, args))
281            .await
282    }
283
284    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
285        self.inner.blocking_read(path, args)
286    }
287
288    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
289        self.inner.blocking_write(path, args)
290    }
291
292    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
293        self.inner.blocking_list(path, args)
294    }
295
296    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
297        self.inner.blocking_delete()
298    }
299}
300
301pub struct TimeoutExecutor {
302    exec: Arc<dyn Execute>,
303    timeout: Duration,
304}
305
306impl TimeoutExecutor {
307    pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
308        Self { exec, timeout }
309    }
310}
311
312impl Execute for TimeoutExecutor {
313    fn execute(&self, f: BoxedStaticFuture<()>) {
314        self.exec.execute(f)
315    }
316
317    fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
318        Some(Box::pin(tokio::time::sleep(self.timeout)))
319    }
320}
321
322pub struct TimeoutWrapper<R> {
323    inner: R,
324
325    timeout: Duration,
326}
327
328impl<R> TimeoutWrapper<R> {
329    fn new(inner: R, timeout: Duration) -> Self {
330        Self { inner, timeout }
331    }
332
333    #[inline]
334    async fn io_timeout<F: Future<Output = Result<T>>, T>(
335        timeout: Duration,
336        op: &'static str,
337        fut: F,
338    ) -> Result<T> {
339        tokio::time::timeout(timeout, fut).await.map_err(|_| {
340            Error::new(ErrorKind::Unexpected, "io operation timeout reached")
341                .with_operation(op)
342                .with_context("timeout", timeout.as_secs_f64().to_string())
343                .set_temporary()
344        })?
345    }
346}
347
348impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
349    async fn read(&mut self) -> Result<Buffer> {
350        let fut = self.inner.read();
351        Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
352    }
353}
354
355impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
356    async fn write(&mut self, bs: Buffer) -> Result<()> {
357        let fut = self.inner.write(bs);
358        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
359    }
360
361    async fn close(&mut self) -> Result<Metadata> {
362        let fut = self.inner.close();
363        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
364    }
365
366    async fn abort(&mut self) -> Result<()> {
367        let fut = self.inner.abort();
368        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
369    }
370}
371
372impl<R: oio::List> oio::List for TimeoutWrapper<R> {
373    async fn next(&mut self) -> Result<Option<oio::Entry>> {
374        let fut = self.inner.next();
375        Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
376    }
377}
378
379impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
380    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
381        self.inner.delete(path, args)
382    }
383
384    async fn flush(&mut self) -> Result<usize> {
385        let fut = self.inner.flush();
386        Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use std::future::pending;
393    use std::future::Future;
394    use std::sync::Arc;
395    use std::time::Duration;
396
397    use futures::StreamExt;
398    use tokio::time::sleep;
399    use tokio::time::timeout;
400
401    use crate::layers::TimeoutLayer;
402    use crate::layers::TypeEraseLayer;
403    use crate::raw::*;
404    use crate::*;
405
406    #[derive(Debug, Clone, Default)]
407    struct MockService;
408
409    impl Access for MockService {
410        type Reader = MockReader;
411        type Writer = ();
412        type Lister = MockLister;
413        type BlockingReader = ();
414        type BlockingWriter = ();
415        type BlockingLister = ();
416        type Deleter = ();
417        type BlockingDeleter = ();
418
419        fn info(&self) -> Arc<AccessorInfo> {
420            let am = AccessorInfo::default();
421            am.set_native_capability(Capability {
422                read: true,
423                delete: true,
424                ..Default::default()
425            });
426
427            am.into()
428        }
429
430        /// This function will build a reader that always return pending.
431        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
432            Ok((RpRead::new(), MockReader))
433        }
434
435        /// This function will never return.
436        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
437            sleep(Duration::from_secs(u64::MAX)).await;
438
439            Ok((RpDelete::default(), ()))
440        }
441
442        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
443            Ok((RpList::default(), MockLister))
444        }
445    }
446
447    #[derive(Debug, Clone, Default)]
448    struct MockReader;
449
450    impl oio::Read for MockReader {
451        fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
452            pending()
453        }
454    }
455
456    #[derive(Debug, Clone, Default)]
457    struct MockLister;
458
459    impl oio::List for MockLister {
460        fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
461            pending()
462        }
463    }
464
465    #[tokio::test]
466    async fn test_operation_timeout() {
467        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
468        let op = Operator::from_inner(acc)
469            .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
470
471        let fut = async {
472            let res = op.delete("test").await;
473            assert!(res.is_err());
474            let err = res.unwrap_err();
475            assert_eq!(err.kind(), ErrorKind::Unexpected);
476            assert!(err.to_string().contains("timeout"))
477        };
478
479        timeout(Duration::from_secs(2), fut)
480            .await
481            .expect("this test should not exceed 2 seconds")
482    }
483
484    #[tokio::test]
485    async fn test_io_timeout() {
486        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
487        let op = Operator::from_inner(acc)
488            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
489
490        let reader = op.reader("test").await.unwrap();
491
492        let res = reader.read(0..4).await;
493        assert!(res.is_err());
494        let err = res.unwrap_err();
495        assert_eq!(err.kind(), ErrorKind::Unexpected);
496        assert!(err.to_string().contains("timeout"))
497    }
498
499    #[tokio::test]
500    async fn test_list_timeout() {
501        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
502        let op = Operator::from_inner(acc).layer(
503            TimeoutLayer::new()
504                .with_timeout(Duration::from_secs(1))
505                .with_io_timeout(Duration::from_secs(1)),
506        );
507
508        let mut lister = op.lister("test").await.unwrap();
509
510        let res = lister.next().await.unwrap();
511        assert!(res.is_err());
512        let err = res.unwrap_err();
513        assert_eq!(err.kind(), ErrorKind::Unexpected);
514        assert!(err.to_string().contains("timeout"))
515    }
516
517    #[tokio::test]
518    async fn test_list_timeout_raw() {
519        use oio::List;
520
521        let acc = MockService;
522        let timeout_layer = TimeoutLayer::new()
523            .with_timeout(Duration::from_secs(1))
524            .with_io_timeout(Duration::from_secs(1));
525        let timeout_acc = timeout_layer.layer(acc);
526
527        let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
528            .await
529            .unwrap();
530
531        let res = lister.next().await;
532        assert!(res.is_err());
533        let err = res.unwrap_err();
534        assert_eq!(err.kind(), ErrorKind::Unexpected);
535        assert!(err.to_string().contains("timeout"));
536    }
537}