opendal_core/layers/
timeout.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::Arc;
20
21use crate::raw::*;
22use crate::*;
23
24/// Add timeout for every operation to avoid slow or unexpected hang operations.
25///
26/// For example, a dead connection could hang a databases sql query. TimeoutLayer
27/// will break this connection and returns an error so users can handle it by
28/// retrying or print to users.
29///
30/// # Notes
31///
32/// `TimeoutLayer` treats all operations in two kinds:
33///
34/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control
35///   them by setting `timeout`.
36/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we
37///   control them by setting `io_timeout`.
38///
39/// # Default
40///
41/// - timeout: 60 seconds
42/// - io_timeout: 10 seconds
43///
44/// # Panics
45///
46/// TimeoutLayer will drop the future if the timeout is reached. This might cause the internal state
47/// of the future to be broken. If underlying future moves ownership into the future, it will be
48/// dropped and will neven return back.
49///
50/// For example, while using `TimeoutLayer` with `RetryLayer` at the same time, please make sure
51/// timeout layer showed up before retry layer.
52///
53/// ```no_run
54/// # use std::time::Duration;
55///
56/// # use opendal_core::layers::RetryLayer;
57/// # use opendal_core::layers::TimeoutLayer;
58/// # use opendal_core::services;
59/// # use opendal_core::Operator;
60/// # use opendal_core::Result;
61///
62/// # fn main() -> Result<()> {
63/// let op = Operator::new(services::Memory::default())?
64///     // This is fine, since timeout happen during retry.
65///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
66///     .layer(RetryLayer::new())
67///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
68///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
69///     .finish();
70/// Ok(())
71/// # }
72/// ```
73///
74/// # Examples
75///
76/// The following examples will create a timeout layer with 10 seconds timeout for all non-io
77/// operations, 3 seconds timeout for all io operations.
78///
79/// ```no_run
80/// # use std::time::Duration;
81///
82/// # use opendal_core::layers::TimeoutLayer;
83/// # use opendal_core::services;
84/// # use opendal_core::Operator;
85/// # use opendal_core::Result;
86///
87/// # fn main() -> Result<()> {
88/// let _ = Operator::new(services::Memory::default())?
89///     .layer(
90///         TimeoutLayer::default()
91///             .with_timeout(Duration::from_secs(10))
92///             .with_io_timeout(Duration::from_secs(3)),
93///     )
94///     .finish();
95/// Ok(())
96/// # }
97/// ```
98///
99/// # Implementation Notes
100///
101/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO
102/// Operations insides `reader`, `writer` will use `Pin<Box<tokio::time::Sleep>>` to track the
103/// timeout.
104///
105/// This might introduce a bit overhead for IO operations, but it's the only way to implement
106/// timeout correctly. We used to implement timeout layer in zero cost way that only stores
107/// a [`Instant`] and check the timeout by comparing the instant with current time.
108/// However, it doesn't work for all cases.
109///
110/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime
111/// will never poll our future again. From the application side, this future is hanging forever
112/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times.
113#[derive(Clone)]
114pub struct TimeoutLayer {
115    timeout: Duration,
116    io_timeout: Duration,
117}
118
119impl Default for TimeoutLayer {
120    fn default() -> Self {
121        Self {
122            timeout: Duration::from_secs(60),
123            io_timeout: Duration::from_secs(10),
124        }
125    }
126}
127
128impl TimeoutLayer {
129    /// Create a new `TimeoutLayer` with default settings.
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    /// Set timeout for TimeoutLayer with given value.
135    ///
136    /// This timeout is for all non-io operations like `stat`, `delete`.
137    pub fn with_timeout(mut self, timeout: Duration) -> Self {
138        self.timeout = timeout;
139        self
140    }
141
142    /// Set io timeout for TimeoutLayer with given value.
143    ///
144    /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`.
145    pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
146        self.io_timeout = timeout;
147        self
148    }
149}
150
151impl<A: Access> Layer<A> for TimeoutLayer {
152    type LayeredAccess = TimeoutAccessor<A>;
153
154    fn layer(&self, inner: A) -> Self::LayeredAccess {
155        let info = inner.info();
156        info.update_executor(|exec| {
157            Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
158        });
159
160        TimeoutAccessor {
161            inner,
162
163            timeout: self.timeout,
164            io_timeout: self.io_timeout,
165        }
166    }
167}
168
169#[derive(Debug, Clone)]
170pub struct TimeoutAccessor<A: Access> {
171    inner: A,
172
173    timeout: Duration,
174    io_timeout: Duration,
175}
176
177impl<A: Access> TimeoutAccessor<A> {
178    async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
179        tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
180            Error::new(ErrorKind::Unexpected, "operation timeout reached")
181                .with_operation(op)
182                .with_context("timeout", self.timeout.as_secs_f64().to_string())
183                .set_temporary()
184        })?
185    }
186
187    async fn io_timeout<F: Future<Output = Result<T>>, T>(
188        &self,
189        op: Operation,
190        fut: F,
191    ) -> Result<T> {
192        tokio::time::timeout(self.io_timeout, fut)
193            .await
194            .map_err(|_| {
195                Error::new(ErrorKind::Unexpected, "io timeout reached")
196                    .with_operation(op)
197                    .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
198                    .set_temporary()
199            })?
200    }
201}
202
203impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
204    type Inner = A;
205    type Reader = TimeoutWrapper<A::Reader>;
206    type Writer = TimeoutWrapper<A::Writer>;
207    type Lister = TimeoutWrapper<A::Lister>;
208    type Deleter = TimeoutWrapper<A::Deleter>;
209
210    fn inner(&self) -> &Self::Inner {
211        &self.inner
212    }
213
214    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
215        self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
216            .await
217    }
218
219    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
220        self.io_timeout(Operation::Read, self.inner.read(path, args))
221            .await
222            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
223    }
224
225    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
226        self.io_timeout(Operation::Write, self.inner.write(path, args))
227            .await
228            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
229    }
230
231    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
232        self.timeout(Operation::Copy, self.inner.copy(from, to, args))
233            .await
234    }
235
236    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
237        self.timeout(Operation::Rename, self.inner.rename(from, to, args))
238            .await
239    }
240
241    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
242        self.timeout(Operation::Stat, self.inner.stat(path, args))
243            .await
244    }
245
246    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
247        self.timeout(Operation::Delete, self.inner.delete())
248            .await
249            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
250    }
251
252    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
253        self.io_timeout(Operation::List, self.inner.list(path, args))
254            .await
255            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
256    }
257
258    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
259        self.timeout(Operation::Presign, self.inner.presign(path, args))
260            .await
261    }
262}
263
264pub struct TimeoutExecutor {
265    exec: Arc<dyn Execute>,
266    timeout: Duration,
267}
268
269impl TimeoutExecutor {
270    pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
271        Self { exec, timeout }
272    }
273}
274
275impl Execute for TimeoutExecutor {
276    fn execute(&self, f: BoxedStaticFuture<()>) {
277        self.exec.execute(f)
278    }
279
280    fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
281        Some(Box::pin(tokio::time::sleep(self.timeout)))
282    }
283}
284
285pub struct TimeoutWrapper<R> {
286    inner: R,
287
288    timeout: Duration,
289}
290
291impl<R> TimeoutWrapper<R> {
292    fn new(inner: R, timeout: Duration) -> Self {
293        Self { inner, timeout }
294    }
295
296    #[inline]
297    async fn io_timeout<F: Future<Output = Result<T>>, T>(
298        timeout: Duration,
299        op: &'static str,
300        fut: F,
301    ) -> Result<T> {
302        tokio::time::timeout(timeout, fut).await.map_err(|_| {
303            Error::new(ErrorKind::Unexpected, "io operation timeout reached")
304                .with_operation(op)
305                .with_context("timeout", timeout.as_secs_f64().to_string())
306                .set_temporary()
307        })?
308    }
309}
310
311impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
312    async fn read(&mut self) -> Result<Buffer> {
313        let fut = self.inner.read();
314        Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
315    }
316}
317
318impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
319    async fn write(&mut self, bs: Buffer) -> Result<()> {
320        let fut = self.inner.write(bs);
321        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
322    }
323
324    async fn close(&mut self) -> Result<Metadata> {
325        let fut = self.inner.close();
326        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
327    }
328
329    async fn abort(&mut self) -> Result<()> {
330        let fut = self.inner.abort();
331        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
332    }
333}
334
335impl<R: oio::List> oio::List for TimeoutWrapper<R> {
336    async fn next(&mut self) -> Result<Option<oio::Entry>> {
337        let fut = self.inner.next();
338        Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
339    }
340}
341
342impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
343    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
344        self.inner.delete(path, args).await
345    }
346
347    async fn close(&mut self) -> Result<()> {
348        let fut = self.inner.close();
349        Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use std::future::Future;
356    use std::future::pending;
357    use std::sync::Arc;
358
359    use futures::StreamExt;
360    use tokio::time::sleep;
361    use tokio::time::timeout;
362
363    use crate::layers::TimeoutLayer;
364    use crate::layers::TypeEraseLayer;
365    use crate::raw::*;
366    use crate::*;
367
368    #[derive(Debug, Clone, Default)]
369    struct MockService;
370
371    impl Access for MockService {
372        type Reader = MockReader;
373        type Writer = ();
374        type Lister = MockLister;
375        type Deleter = ();
376
377        fn info(&self) -> Arc<AccessorInfo> {
378            let am = AccessorInfo::default();
379            am.set_native_capability(Capability {
380                read: true,
381                delete: true,
382                ..Default::default()
383            });
384
385            am.into()
386        }
387
388        /// This function will build a reader that always return pending.
389        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
390            Ok((RpRead::new(), MockReader))
391        }
392
393        /// This function will never return.
394        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
395            sleep(Duration::from_secs(u64::MAX)).await;
396
397            Ok((RpDelete::default(), ()))
398        }
399
400        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
401            Ok((RpList::default(), MockLister))
402        }
403    }
404
405    #[derive(Debug, Clone, Default)]
406    struct MockReader;
407
408    impl oio::Read for MockReader {
409        fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
410            pending()
411        }
412    }
413
414    #[derive(Debug, Clone, Default)]
415    struct MockLister;
416
417    impl oio::List for MockLister {
418        fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
419            pending()
420        }
421    }
422
423    #[tokio::test]
424    async fn test_operation_timeout() {
425        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
426        let op = Operator::from_inner(acc)
427            .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
428
429        let fut = async {
430            let res = op.delete("test").await;
431            assert!(res.is_err());
432            let err = res.unwrap_err();
433            assert_eq!(err.kind(), ErrorKind::Unexpected);
434            assert!(err.to_string().contains("timeout"))
435        };
436
437        timeout(Duration::from_secs(2), fut)
438            .await
439            .expect("this test should not exceed 2 seconds")
440    }
441
442    #[tokio::test]
443    async fn test_io_timeout() {
444        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
445        let op = Operator::from_inner(acc)
446            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
447
448        let reader = op.reader("test").await.unwrap();
449
450        let res = reader.read(0..4).await;
451        assert!(res.is_err());
452        let err = res.unwrap_err();
453        assert_eq!(err.kind(), ErrorKind::Unexpected);
454        assert!(err.to_string().contains("timeout"))
455    }
456
457    #[tokio::test]
458    async fn test_list_timeout() {
459        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
460        let op = Operator::from_inner(acc).layer(
461            TimeoutLayer::new()
462                .with_timeout(Duration::from_secs(1))
463                .with_io_timeout(Duration::from_secs(1)),
464        );
465
466        let mut lister = op.lister("test").await.unwrap();
467
468        let res = lister.next().await.unwrap();
469        assert!(res.is_err());
470        let err = res.unwrap_err();
471        assert_eq!(err.kind(), ErrorKind::Unexpected);
472        assert!(err.to_string().contains("timeout"))
473    }
474
475    #[tokio::test]
476    async fn test_list_timeout_raw() {
477        use oio::List;
478
479        let acc = MockService;
480        let timeout_layer = TimeoutLayer::new()
481            .with_timeout(Duration::from_secs(1))
482            .with_io_timeout(Duration::from_secs(1));
483        let timeout_acc = timeout_layer.layer(acc);
484
485        let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
486            .await
487            .unwrap();
488
489        let res = lister.next().await;
490        assert!(res.is_err());
491        let err = res.unwrap_err();
492        assert_eq!(err.kind(), ErrorKind::Unexpected);
493        assert!(err.to_string().contains("timeout"));
494    }
495}