opendal/layers/
timeout.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21
22use crate::raw::*;
23use crate::*;
24
25/// Add timeout for every operation to avoid slow or unexpected hang operations.
26///
27/// For example, a dead connection could hang a databases sql query. TimeoutLayer
28/// will break this connection and returns an error so users can handle it by
29/// retrying or print to users.
30///
31/// # Notes
32///
33/// `TimeoutLayer` treats all operations in two kinds:
34///
35/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control
36///   them by setting `timeout`.
37/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we
38///   control them by setting `io_timeout`.
39///
40/// # Default
41///
42/// - timeout: 60 seconds
43/// - io_timeout: 10 seconds
44///
45/// # Panics
46///
47/// TimeoutLayer will drop the future if the timeout is reached. This might cause the internal state
48/// of the future to be broken. If underlying future moves ownership into the future, it will be
49/// dropped and will neven return back.
50///
51/// For example, while using `TimeoutLayer` with `RetryLayer` at the same time, please make sure
52/// timeout layer showed up before retry layer.
53///
54/// ```no_run
55/// # use std::time::Duration;
56///
57/// # use opendal::layers::RetryLayer;
58/// # use opendal::layers::TimeoutLayer;
59/// # use opendal::services;
60/// # use opendal::Operator;
61/// # use opendal::Result;
62///
63/// # fn main() -> Result<()> {
64/// let op = Operator::new(services::Memory::default())?
65///     // This is fine, since timeout happen during retry.
66///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
67///     .layer(RetryLayer::new())
68///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
69///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
70///     .finish();
71/// Ok(())
72/// # }
73/// ```
74///
75/// # Examples
76///
77/// The following examples will create a timeout layer with 10 seconds timeout for all non-io
78/// operations, 3 seconds timeout for all io operations.
79///
80/// ```no_run
81/// # use std::time::Duration;
82///
83/// # use opendal::layers::TimeoutLayer;
84/// # use opendal::services;
85/// # use opendal::Operator;
86/// # use opendal::Result;
87///
88/// # fn main() -> Result<()> {
89/// let _ = Operator::new(services::Memory::default())?
90///     .layer(
91///         TimeoutLayer::default()
92///             .with_timeout(Duration::from_secs(10))
93///             .with_io_timeout(Duration::from_secs(3)),
94///     )
95///     .finish();
96/// Ok(())
97/// # }
98/// ```
99///
100/// # Implementation Notes
101///
102/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO
103/// Operations insides `reader`, `writer` will use `Pin<Box<tokio::time::Sleep>>` to track the
104/// timeout.
105///
106/// This might introduce a bit overhead for IO operations, but it's the only way to implement
107/// timeout correctly. We used to implement timeout layer in zero cost way that only stores
108/// a [`std::time::Instant`] and check the timeout by comparing the instant with current time.
109/// However, it doesn't work for all cases.
110///
111/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime
112/// will never poll our future again. From the application side, this future is hanging forever
113/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times.
114#[derive(Clone)]
115pub struct TimeoutLayer {
116    timeout: Duration,
117    io_timeout: Duration,
118}
119
120impl Default for TimeoutLayer {
121    fn default() -> Self {
122        Self {
123            timeout: Duration::from_secs(60),
124            io_timeout: Duration::from_secs(10),
125        }
126    }
127}
128
129impl TimeoutLayer {
130    /// Create a new `TimeoutLayer` with default settings.
131    pub fn new() -> Self {
132        Self::default()
133    }
134
135    /// Set timeout for TimeoutLayer with given value.
136    ///
137    /// This timeout is for all non-io operations like `stat`, `delete`.
138    pub fn with_timeout(mut self, timeout: Duration) -> Self {
139        self.timeout = timeout;
140        self
141    }
142
143    /// Set io timeout for TimeoutLayer with given value.
144    ///
145    /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`.
146    pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
147        self.io_timeout = timeout;
148        self
149    }
150}
151
152impl<A: Access> Layer<A> for TimeoutLayer {
153    type LayeredAccess = TimeoutAccessor<A>;
154
155    fn layer(&self, inner: A) -> Self::LayeredAccess {
156        let info = inner.info();
157        info.update_executor(|exec| {
158            Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
159        });
160
161        TimeoutAccessor {
162            inner,
163
164            timeout: self.timeout,
165            io_timeout: self.io_timeout,
166        }
167    }
168}
169
170#[derive(Debug, Clone)]
171pub struct TimeoutAccessor<A: Access> {
172    inner: A,
173
174    timeout: Duration,
175    io_timeout: Duration,
176}
177
178impl<A: Access> TimeoutAccessor<A> {
179    async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
180        tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
181            Error::new(ErrorKind::Unexpected, "operation timeout reached")
182                .with_operation(op)
183                .with_context("timeout", self.timeout.as_secs_f64().to_string())
184                .set_temporary()
185        })?
186    }
187
188    async fn io_timeout<F: Future<Output = Result<T>>, T>(
189        &self,
190        op: Operation,
191        fut: F,
192    ) -> Result<T> {
193        tokio::time::timeout(self.io_timeout, fut)
194            .await
195            .map_err(|_| {
196                Error::new(ErrorKind::Unexpected, "io timeout reached")
197                    .with_operation(op)
198                    .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
199                    .set_temporary()
200            })?
201    }
202}
203
204impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
205    type Inner = A;
206    type Reader = TimeoutWrapper<A::Reader>;
207    type Writer = TimeoutWrapper<A::Writer>;
208    type Lister = TimeoutWrapper<A::Lister>;
209    type Deleter = TimeoutWrapper<A::Deleter>;
210
211    fn inner(&self) -> &Self::Inner {
212        &self.inner
213    }
214
215    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
216        self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
217            .await
218    }
219
220    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
221        self.io_timeout(Operation::Read, self.inner.read(path, args))
222            .await
223            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
224    }
225
226    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
227        self.io_timeout(Operation::Write, self.inner.write(path, args))
228            .await
229            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
230    }
231
232    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
233        self.timeout(Operation::Copy, self.inner.copy(from, to, args))
234            .await
235    }
236
237    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
238        self.timeout(Operation::Rename, self.inner.rename(from, to, args))
239            .await
240    }
241
242    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
243        self.timeout(Operation::Stat, self.inner.stat(path, args))
244            .await
245    }
246
247    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
248        self.timeout(Operation::Delete, self.inner.delete())
249            .await
250            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
251    }
252
253    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
254        self.io_timeout(Operation::List, self.inner.list(path, args))
255            .await
256            .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
257    }
258
259    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
260        self.timeout(Operation::Presign, self.inner.presign(path, args))
261            .await
262    }
263}
264
265pub struct TimeoutExecutor {
266    exec: Arc<dyn Execute>,
267    timeout: Duration,
268}
269
270impl TimeoutExecutor {
271    pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
272        Self { exec, timeout }
273    }
274}
275
276impl Execute for TimeoutExecutor {
277    fn execute(&self, f: BoxedStaticFuture<()>) {
278        self.exec.execute(f)
279    }
280
281    fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
282        Some(Box::pin(tokio::time::sleep(self.timeout)))
283    }
284}
285
286pub struct TimeoutWrapper<R> {
287    inner: R,
288
289    timeout: Duration,
290}
291
292impl<R> TimeoutWrapper<R> {
293    fn new(inner: R, timeout: Duration) -> Self {
294        Self { inner, timeout }
295    }
296
297    #[inline]
298    async fn io_timeout<F: Future<Output = Result<T>>, T>(
299        timeout: Duration,
300        op: &'static str,
301        fut: F,
302    ) -> Result<T> {
303        tokio::time::timeout(timeout, fut).await.map_err(|_| {
304            Error::new(ErrorKind::Unexpected, "io operation timeout reached")
305                .with_operation(op)
306                .with_context("timeout", timeout.as_secs_f64().to_string())
307                .set_temporary()
308        })?
309    }
310}
311
312impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
313    async fn read(&mut self) -> Result<Buffer> {
314        let fut = self.inner.read();
315        Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
316    }
317}
318
319impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
320    async fn write(&mut self, bs: Buffer) -> Result<()> {
321        let fut = self.inner.write(bs);
322        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
323    }
324
325    async fn close(&mut self) -> Result<Metadata> {
326        let fut = self.inner.close();
327        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
328    }
329
330    async fn abort(&mut self) -> Result<()> {
331        let fut = self.inner.abort();
332        Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
333    }
334}
335
336impl<R: oio::List> oio::List for TimeoutWrapper<R> {
337    async fn next(&mut self) -> Result<Option<oio::Entry>> {
338        let fut = self.inner.next();
339        Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
340    }
341}
342
343impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
344    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
345        self.inner.delete(path, args)
346    }
347
348    async fn flush(&mut self) -> Result<usize> {
349        let fut = self.inner.flush();
350        Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use std::future::Future;
357    use std::future::pending;
358    use std::sync::Arc;
359    use std::time::Duration;
360
361    use futures::StreamExt;
362    use tokio::time::sleep;
363    use tokio::time::timeout;
364
365    use crate::layers::TimeoutLayer;
366    use crate::layers::TypeEraseLayer;
367    use crate::raw::*;
368    use crate::*;
369
370    #[derive(Debug, Clone, Default)]
371    struct MockService;
372
373    impl Access for MockService {
374        type Reader = MockReader;
375        type Writer = ();
376        type Lister = MockLister;
377        type Deleter = ();
378
379        fn info(&self) -> Arc<AccessorInfo> {
380            let am = AccessorInfo::default();
381            am.set_native_capability(Capability {
382                read: true,
383                delete: true,
384                ..Default::default()
385            });
386
387            am.into()
388        }
389
390        /// This function will build a reader that always return pending.
391        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
392            Ok((RpRead::new(), MockReader))
393        }
394
395        /// This function will never return.
396        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
397            sleep(Duration::from_secs(u64::MAX)).await;
398
399            Ok((RpDelete::default(), ()))
400        }
401
402        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
403            Ok((RpList::default(), MockLister))
404        }
405    }
406
407    #[derive(Debug, Clone, Default)]
408    struct MockReader;
409
410    impl oio::Read for MockReader {
411        fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
412            pending()
413        }
414    }
415
416    #[derive(Debug, Clone, Default)]
417    struct MockLister;
418
419    impl oio::List for MockLister {
420        fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
421            pending()
422        }
423    }
424
425    #[tokio::test]
426    async fn test_operation_timeout() {
427        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
428        let op = Operator::from_inner(acc)
429            .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
430
431        let fut = async {
432            let res = op.delete("test").await;
433            assert!(res.is_err());
434            let err = res.unwrap_err();
435            assert_eq!(err.kind(), ErrorKind::Unexpected);
436            assert!(err.to_string().contains("timeout"))
437        };
438
439        timeout(Duration::from_secs(2), fut)
440            .await
441            .expect("this test should not exceed 2 seconds")
442    }
443
444    #[tokio::test]
445    async fn test_io_timeout() {
446        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
447        let op = Operator::from_inner(acc)
448            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
449
450        let reader = op.reader("test").await.unwrap();
451
452        let res = reader.read(0..4).await;
453        assert!(res.is_err());
454        let err = res.unwrap_err();
455        assert_eq!(err.kind(), ErrorKind::Unexpected);
456        assert!(err.to_string().contains("timeout"))
457    }
458
459    #[tokio::test]
460    async fn test_list_timeout() {
461        let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
462        let op = Operator::from_inner(acc).layer(
463            TimeoutLayer::new()
464                .with_timeout(Duration::from_secs(1))
465                .with_io_timeout(Duration::from_secs(1)),
466        );
467
468        let mut lister = op.lister("test").await.unwrap();
469
470        let res = lister.next().await.unwrap();
471        assert!(res.is_err());
472        let err = res.unwrap_err();
473        assert_eq!(err.kind(), ErrorKind::Unexpected);
474        assert!(err.to_string().contains("timeout"))
475    }
476
477    #[tokio::test]
478    async fn test_list_timeout_raw() {
479        use oio::List;
480
481        let acc = MockService;
482        let timeout_layer = TimeoutLayer::new()
483            .with_timeout(Duration::from_secs(1))
484            .with_io_timeout(Duration::from_secs(1));
485        let timeout_acc = timeout_layer.layer(acc);
486
487        let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
488            .await
489            .unwrap();
490
491        let res = lister.next().await;
492        assert!(res.is_err());
493        let err = res.unwrap_err();
494        assert_eq!(err.kind(), ErrorKind::Unexpected);
495        assert!(err.to_string().contains("timeout"));
496    }
497}