opendal_core/layers/
retry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use backon::ExponentialBuilder;
23use backon::Retryable;
24use log::warn;
25
26use crate::raw::*;
27use crate::*;
28
29/// Add retry for temporary failed operations.
30///
31/// # Notes
32///
33/// This layer will retry failed operations when [`Error::is_temporary`]
34/// returns true. If operation still failed, this layer will set error to
35/// `Persistent` which means error has been retried.
36///
37/// # Panics
38///
39/// While retrying `Reader` or `Writer` operations, please make sure either:
40///
41/// - All futures generated by `Reader::read` or `Writer::close` are resolved to `Ready`.
42/// - Or, won't call any `Reader` or `Writer` methods after retry returns final error.
43///
44/// Otherwise, `RetryLayer` could panic while hitting in bad states.
45///
46/// For example, while composing `RetryLayer` with `TimeoutLayer`. The order of layer is sensitive.
47///
48/// ```no_run
49/// # use std::time::Duration;
50///
51/// # use opendal_core::layers::RetryLayer;
52/// # use opendal_core::layers::TimeoutLayer;
53/// # use opendal_core::services;
54/// # use opendal_core::Operator;
55/// # use opendal_core::Result;
56///
57/// # fn main() -> Result<()> {
58/// let op = Operator::new(services::Memory::default())?
59///     // This is fine, since timeout happen during retry.
60///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
61///     .layer(RetryLayer::new())
62///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
63///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
64///     .finish();
65/// Ok(())
66/// # }
67/// ```
68///
69/// # Examples
70///
71/// ```no_run
72/// # use opendal_core::layers::RetryLayer;
73/// # use opendal_core::services;
74/// # use opendal_core::Operator;
75/// # use opendal_core::Result;
76///
77/// # fn main() -> Result<()> {
78/// let _ = Operator::new(services::Memory::default())?
79///     .layer(RetryLayer::new())
80///     .finish();
81/// Ok(())
82/// # }
83/// ```
84///
85/// ## Customize retry interceptor
86///
87/// RetryLayer accepts [`RetryInterceptor`] to allow users to customize
88/// their own retry interceptor logic.
89///
90/// ```no_run
91/// # use std::time::Duration;
92///
93/// # use opendal_core::layers::RetryInterceptor;
94/// # use opendal_core::layers::RetryLayer;
95/// # use opendal_core::services;
96/// # use opendal_core::Error;
97/// # use opendal_core::Operator;
98/// # use opendal_core::Result;
99///
100/// struct MyRetryInterceptor;
101///
102/// impl RetryInterceptor for MyRetryInterceptor {
103///     fn intercept(&self, err: &Error, dur: Duration) {
104///         // do something
105///     }
106/// }
107///
108/// # fn main() -> Result<()> {
109/// let _ = Operator::new(services::Memory::default())?
110///     .layer(RetryLayer::new().with_notify(MyRetryInterceptor))
111///     .finish();
112/// Ok(())
113/// # }
114/// ```
115pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
116    builder: ExponentialBuilder,
117    notify: Arc<I>,
118}
119
120impl<I: RetryInterceptor> Clone for RetryLayer<I> {
121    fn clone(&self) -> Self {
122        Self {
123            builder: self.builder,
124            notify: self.notify.clone(),
125        }
126    }
127}
128
129impl Default for RetryLayer {
130    fn default() -> Self {
131        Self {
132            builder: ExponentialBuilder::default(),
133            notify: Arc::new(DefaultRetryInterceptor),
134        }
135    }
136}
137
138impl RetryLayer {
139    /// Create a new retry layer.
140    /// # Examples
141    ///
142    /// ```no_run
143    /// use anyhow::Result;
144    /// use opendal_core::layers::RetryLayer;
145    /// use opendal_core::services;
146    /// use opendal_core::Operator;
147    ///
148    /// let _ = Operator::new(services::Memory::default())
149    ///     .expect("must init")
150    ///     .layer(RetryLayer::new());
151    /// ```
152    pub fn new() -> RetryLayer {
153        Self::default()
154    }
155}
156
157impl<I: RetryInterceptor> RetryLayer<I> {
158    /// Set the retry interceptor as new notify.
159    ///
160    /// ```no_run
161    /// use opendal_core::layers::RetryLayer;
162    /// use opendal_core::services;
163    /// use opendal_core::Operator;
164    ///
165    /// fn notify(_err: &opendal_core::Error, _dur: std::time::Duration) {}
166    ///
167    /// let _ = Operator::new(services::Memory::default())
168    ///     .expect("must init")
169    ///     .layer(RetryLayer::new().with_notify(notify))
170    ///     .finish();
171    /// ```
172    pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
173        RetryLayer {
174            builder: self.builder,
175            notify: Arc::new(notify),
176        }
177    }
178
179    /// Set jitter of current backoff.
180    ///
181    /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay)
182    /// to current delay.
183    pub fn with_jitter(mut self) -> Self {
184        self.builder = self.builder.with_jitter();
185        self
186    }
187
188    /// Set factor of current backoff.
189    ///
190    /// # Panics
191    ///
192    /// This function will panic if input factor smaller than `1.0`.
193    pub fn with_factor(mut self, factor: f32) -> Self {
194        self.builder = self.builder.with_factor(factor);
195        self
196    }
197
198    /// Set min_delay of current backoff.
199    pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
200        self.builder = self.builder.with_min_delay(min_delay);
201        self
202    }
203
204    /// Set max_delay of current backoff.
205    ///
206    /// Delay will not increase if current delay is larger than max_delay.
207    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
208        self.builder = self.builder.with_max_delay(max_delay);
209        self
210    }
211
212    /// Set max_times of current backoff.
213    ///
214    /// Backoff will return `None` if max times is reaching.
215    pub fn with_max_times(mut self, max_times: usize) -> Self {
216        self.builder = self.builder.with_max_times(max_times);
217        self
218    }
219}
220
221impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
222    type LayeredAccess = RetryAccessor<A, I>;
223
224    fn layer(&self, inner: A) -> Self::LayeredAccess {
225        RetryAccessor {
226            inner: Arc::new(inner),
227            builder: self.builder,
228            notify: self.notify.clone(),
229        }
230    }
231}
232
233/// RetryInterceptor is used to intercept while retry happened.
234pub trait RetryInterceptor: Send + Sync + 'static {
235    /// Everytime RetryLayer is retrying, this function will be called.
236    ///
237    /// # Timing
238    ///
239    /// just before the retry sleep.
240    ///
241    /// # Inputs
242    ///
243    /// - err: The error that caused the current retry.
244    /// - dur: The duration that will sleep before next retry.
245    ///
246    /// # Notes
247    ///
248    /// The intercept must be quick and non-blocking. No heavy IO is
249    /// allowed. Otherwise, the retry will be blocked.
250    fn intercept(&self, err: &Error, dur: Duration);
251}
252
253impl<F> RetryInterceptor for F
254where
255    F: Fn(&Error, Duration) + Send + Sync + 'static,
256{
257    fn intercept(&self, err: &Error, dur: Duration) {
258        self(err, dur);
259    }
260}
261
262/// The DefaultRetryInterceptor will log the retry error in warning level.
263pub struct DefaultRetryInterceptor;
264
265impl RetryInterceptor for DefaultRetryInterceptor {
266    fn intercept(&self, err: &Error, dur: Duration) {
267        warn!(
268            target: "opendal::layers::retry",
269            "will retry after {}s because: {}",
270            dur.as_secs_f64(), err)
271    }
272}
273
274pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
275    inner: Arc<A>,
276    builder: ExponentialBuilder,
277    notify: Arc<I>,
278}
279
280impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
281    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
282        f.debug_struct("RetryAccessor")
283            .field("inner", &self.inner)
284            .finish_non_exhaustive()
285    }
286}
287
288impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
289    type Inner = A;
290    type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
291    type Writer = RetryWrapper<A::Writer, I>;
292    type Lister = RetryWrapper<A::Lister, I>;
293    type Deleter = RetryWrapper<A::Deleter, I>;
294
295    fn inner(&self) -> &Self::Inner {
296        &self.inner
297    }
298
299    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
300        { || self.inner.create_dir(path, args.clone()) }
301            .retry(self.builder)
302            .when(|e| e.is_temporary())
303            .notify(|err, dur: Duration| self.notify.intercept(err, dur))
304            .await
305            .map_err(|e| e.set_persistent())
306    }
307
308    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
309        let (rp, reader) = { || self.inner.read(path, args.clone()) }
310            .retry(self.builder)
311            .when(|e| e.is_temporary())
312            .notify(|err, dur| self.notify.intercept(err, dur))
313            .await
314            .map_err(|e| e.set_persistent())?;
315
316        let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
317        let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
318
319        Ok((rp, retry_wrapper))
320    }
321
322    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
323        { || self.inner.write(path, args.clone()) }
324            .retry(self.builder)
325            .when(|e| e.is_temporary())
326            .notify(|err, dur| self.notify.intercept(err, dur))
327            .await
328            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
329            .map_err(|e| e.set_persistent())
330    }
331
332    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
333        { || self.inner.stat(path, args.clone()) }
334            .retry(self.builder)
335            .when(|e| e.is_temporary())
336            .notify(|err, dur| self.notify.intercept(err, dur))
337            .await
338            .map_err(|e| e.set_persistent())
339    }
340
341    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
342        { || self.inner.delete() }
343            .retry(self.builder)
344            .when(|e| e.is_temporary())
345            .notify(|err, dur| self.notify.intercept(err, dur))
346            .await
347            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
348            .map_err(|e| e.set_persistent())
349    }
350
351    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
352        { || self.inner.copy(from, to, args.clone()) }
353            .retry(self.builder)
354            .when(|e| e.is_temporary())
355            .notify(|err, dur| self.notify.intercept(err, dur))
356            .await
357            .map_err(|e| e.set_persistent())
358    }
359
360    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
361        { || self.inner.rename(from, to, args.clone()) }
362            .retry(self.builder)
363            .when(|e| e.is_temporary())
364            .notify(|err, dur| self.notify.intercept(err, dur))
365            .await
366            .map_err(|e| e.set_persistent())
367    }
368
369    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
370        { || self.inner.list(path, args.clone()) }
371            .retry(self.builder)
372            .when(|e| e.is_temporary())
373            .notify(|err, dur| self.notify.intercept(err, dur))
374            .await
375            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
376            .map_err(|e| e.set_persistent())
377    }
378}
379
380pub struct RetryReader<A, R> {
381    inner: Arc<A>,
382    reader: Option<R>,
383
384    path: String,
385    args: OpRead,
386}
387
388impl<A, R> RetryReader<A, R> {
389    fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
390        Self {
391            inner,
392            reader: Some(r),
393
394            path,
395            args,
396        }
397    }
398}
399
400impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
401    async fn read(&mut self) -> Result<Buffer> {
402        loop {
403            match self.reader.take() {
404                None => {
405                    let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
406                    self.reader = Some(r);
407                    continue;
408                }
409                Some(mut reader) => {
410                    let buf = reader.read().await?;
411                    self.reader = Some(reader);
412                    self.args.range_mut().advance(buf.len() as u64);
413                    return Ok(buf);
414                }
415            }
416        }
417    }
418}
419
420pub struct RetryWrapper<R, I> {
421    inner: Option<R>,
422    notify: Arc<I>,
423
424    builder: ExponentialBuilder,
425}
426
427impl<R, I> RetryWrapper<R, I> {
428    fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
429        Self {
430            inner: Some(inner),
431            notify,
432            builder: backoff,
433        }
434    }
435
436    fn take_inner(&mut self) -> Result<R> {
437        self.inner.take().ok_or_else(|| {
438            Error::new(
439                ErrorKind::Unexpected,
440                "retry layer is in bad state, please make sure future not dropped before ready",
441            )
442        })
443    }
444}
445
446impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
447    async fn read(&mut self) -> Result<Buffer> {
448        use backon::RetryableWithContext;
449
450        let inner = self.take_inner()?;
451
452        let (inner, res) = {
453            |mut r: R| async move {
454                let res = r.read().await;
455
456                (r, res)
457            }
458        }
459        .retry(self.builder)
460        .when(|e| e.is_temporary())
461        .context(inner)
462        .notify(|err, dur| self.notify.intercept(err, dur))
463        .await;
464
465        self.inner = Some(inner);
466        res.map_err(|err| err.set_persistent())
467    }
468}
469
470impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
471    async fn write(&mut self, bs: Buffer) -> Result<()> {
472        use backon::RetryableWithContext;
473
474        let inner = self.take_inner()?;
475
476        let ((inner, _), res) = {
477            |(mut r, bs): (R, Buffer)| async move {
478                let res = r.write(bs.clone()).await;
479
480                ((r, bs), res)
481            }
482        }
483        .retry(self.builder)
484        .when(|e| e.is_temporary())
485        .context((inner, bs))
486        .notify(|err, dur| self.notify.intercept(err, dur))
487        .await;
488
489        self.inner = Some(inner);
490        res.map_err(|err| err.set_persistent())
491    }
492
493    async fn abort(&mut self) -> Result<()> {
494        use backon::RetryableWithContext;
495
496        let inner = self.take_inner()?;
497
498        let (inner, res) = {
499            |mut r: R| async move {
500                let res = r.abort().await;
501
502                (r, res)
503            }
504        }
505        .retry(self.builder)
506        .when(|e| e.is_temporary())
507        .context(inner)
508        .notify(|err, dur| self.notify.intercept(err, dur))
509        .await;
510
511        self.inner = Some(inner);
512        res.map_err(|err| err.set_persistent())
513    }
514
515    async fn close(&mut self) -> Result<Metadata> {
516        use backon::RetryableWithContext;
517
518        let inner = self.take_inner()?;
519
520        let (inner, res) = {
521            |mut r: R| async move {
522                let res = r.close().await;
523
524                (r, res)
525            }
526        }
527        .retry(self.builder)
528        .when(|e| e.is_temporary())
529        .context(inner)
530        .notify(|err, dur| self.notify.intercept(err, dur))
531        .await;
532
533        self.inner = Some(inner);
534        res.map_err(|err| err.set_persistent())
535    }
536}
537
538impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
539    async fn next(&mut self) -> Result<Option<oio::Entry>> {
540        use backon::RetryableWithContext;
541
542        let inner = self.take_inner()?;
543
544        let (inner, res) = {
545            |mut p: P| async move {
546                let res = p.next().await;
547
548                (p, res)
549            }
550        }
551        .retry(self.builder)
552        .when(|e| e.is_temporary())
553        .context(inner)
554        .notify(|err, dur| self.notify.intercept(err, dur))
555        .await;
556
557        self.inner = Some(inner);
558        res.map_err(|err| err.set_persistent())
559    }
560}
561
562impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
563    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
564        use backon::RetryableWithContext;
565
566        let inner = self.take_inner()?;
567        let path = path.to_string();
568        let args_cloned = args.clone();
569
570        let (inner, res) = {
571            |mut p: P| {
572                let path = path.clone();
573                let args = args_cloned.clone();
574                async move {
575                    let res = p.delete(&path, args).await;
576                    (p, res)
577                }
578            }
579        }
580        .retry(self.builder)
581        .when(|e| e.is_temporary())
582        .context(inner)
583        .notify(|err, dur| {
584            self.notify.intercept(err, dur);
585        })
586        .await;
587
588        self.inner = Some(inner);
589        res.map_err(|e| e.set_persistent())
590    }
591
592    async fn close(&mut self) -> Result<()> {
593        use backon::RetryableWithContext;
594
595        let inner = self.take_inner()?;
596
597        let (inner, res) = {
598            |mut p: P| async move {
599                let res = p.close().await;
600
601                (p, res)
602            }
603        }
604        .retry(self.builder)
605        .when(|e| e.is_temporary())
606        .context(inner)
607        .notify(|err, dur| self.notify.intercept(err, dur))
608        .await;
609
610        self.inner = Some(inner);
611        res.map_err(|err| err.set_persistent())
612    }
613}
614
615#[cfg(test)]
616mod tests {
617    use std::sync::Arc;
618    use std::sync::Mutex;
619
620    use bytes::Bytes;
621    use futures::TryStreamExt;
622    use futures::stream;
623    use tracing_subscriber::filter::LevelFilter;
624
625    use super::*;
626    use crate::layers::LoggingLayer;
627
628    #[derive(Default, Clone)]
629    struct MockBuilder {
630        attempt: Arc<Mutex<usize>>,
631    }
632
633    impl Builder for MockBuilder {
634        type Config = ();
635
636        fn build(self) -> Result<impl Access> {
637            Ok(MockService {
638                attempt: self.attempt.clone(),
639            })
640        }
641    }
642
643    #[derive(Debug, Clone, Default)]
644    struct MockService {
645        attempt: Arc<Mutex<usize>>,
646    }
647
648    impl Access for MockService {
649        type Reader = MockReader;
650        type Writer = MockWriter;
651        type Lister = MockLister;
652        type Deleter = MockDeleter;
653
654        fn info(&self) -> Arc<AccessorInfo> {
655            let am = AccessorInfo::default();
656            am.set_scheme("mock").set_native_capability(Capability {
657                read: true,
658                write: true,
659                write_can_multi: true,
660                delete: true,
661                delete_max_size: Some(10),
662                stat: true,
663                list: true,
664                list_with_recursive: true,
665                ..Default::default()
666            });
667
668            am.into()
669        }
670
671        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
672            Ok(RpStat::new(
673                Metadata::new(EntryMode::FILE).with_content_length(13),
674            ))
675        }
676
677        async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
678            Ok((
679                RpRead::new(),
680                MockReader {
681                    buf: Bytes::from("Hello, World!").into(),
682                    range: args.range(),
683                    attempt: self.attempt.clone(),
684                },
685            ))
686        }
687
688        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
689            Ok((
690                RpDelete::default(),
691                MockDeleter {
692                    size: 0,
693                    attempt: self.attempt.clone(),
694                },
695            ))
696        }
697
698        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
699            Ok((RpWrite::new(), MockWriter {}))
700        }
701
702        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
703            let lister = MockLister::default();
704            Ok((RpList::default(), lister))
705        }
706    }
707
708    #[derive(Debug, Clone, Default)]
709    struct MockReader {
710        buf: Buffer,
711        range: BytesRange,
712        attempt: Arc<Mutex<usize>>,
713    }
714
715    impl oio::Read for MockReader {
716        async fn read(&mut self) -> Result<Buffer> {
717            let mut attempt = self.attempt.lock().unwrap();
718            *attempt += 1;
719
720            match *attempt {
721                1 => Err(
722                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
723                        .set_temporary(),
724                ),
725                2 => Err(
726                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
727                        .set_temporary(),
728                ),
729                // Should read out all data.
730                3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
731                4 => Err(
732                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
733                        .set_temporary(),
734                ),
735                // Should be empty.
736                5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
737                _ => unreachable!(),
738            }
739        }
740    }
741
742    #[derive(Debug, Clone, Default)]
743    struct MockWriter {}
744
745    impl oio::Write for MockWriter {
746        async fn write(&mut self, _: Buffer) -> Result<()> {
747            Ok(())
748        }
749
750        async fn close(&mut self) -> Result<Metadata> {
751            Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
752        }
753
754        async fn abort(&mut self) -> Result<()> {
755            Ok(())
756        }
757    }
758
759    #[derive(Debug, Clone, Default)]
760    struct MockLister {
761        attempt: usize,
762    }
763
764    impl oio::List for MockLister {
765        async fn next(&mut self) -> Result<Option<oio::Entry>> {
766            self.attempt += 1;
767            match self.attempt {
768                1 => Err(Error::new(
769                    ErrorKind::RateLimited,
770                    "retryable rate limited error from lister",
771                )
772                .set_temporary()),
773                2 => Ok(Some(oio::Entry::new(
774                    "hello",
775                    Metadata::new(EntryMode::FILE),
776                ))),
777                3 => Ok(Some(oio::Entry::new(
778                    "world",
779                    Metadata::new(EntryMode::FILE),
780                ))),
781                4 => Err(
782                    Error::new(ErrorKind::Unexpected, "retryable internal server error")
783                        .set_temporary(),
784                ),
785                5 => Ok(Some(oio::Entry::new(
786                    "2023/",
787                    Metadata::new(EntryMode::DIR),
788                ))),
789                6 => Ok(Some(oio::Entry::new(
790                    "0208/",
791                    Metadata::new(EntryMode::DIR),
792                ))),
793                7 => Ok(None),
794                _ => {
795                    unreachable!()
796                }
797            }
798        }
799    }
800
801    #[derive(Debug, Clone, Default)]
802    struct MockDeleter {
803        size: usize,
804        attempt: Arc<Mutex<usize>>,
805    }
806
807    impl oio::Delete for MockDeleter {
808        async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
809            self.size += 1;
810            Ok(())
811        }
812
813        async fn close(&mut self) -> Result<()> {
814            let mut attempt = self.attempt.lock().unwrap();
815            *attempt += 1;
816
817            match *attempt {
818                1 => Err(
819                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
820                        .set_temporary(),
821                ),
822                2..=4 => {
823                    self.size = self.size.saturating_sub(1);
824                    Err(
825                        Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
826                            .set_temporary(),
827                    )
828                }
829                5 => {
830                    self.size = self.size.saturating_sub(1);
831                    if self.size == 0 {
832                        Ok(())
833                    } else {
834                        Err(
835                            Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
836                                .set_temporary(),
837                        )
838                    }
839                }
840                _ => unreachable!(),
841            }
842        }
843    }
844
845    #[tokio::test]
846    async fn test_retry_read() {
847        let _ = tracing_subscriber::fmt()
848            .with_max_level(LevelFilter::TRACE)
849            .with_test_writer()
850            .try_init();
851
852        let builder = MockBuilder::default();
853        let op = Operator::new(builder.clone())
854            .unwrap()
855            .layer(LoggingLayer::default())
856            .layer(RetryLayer::new())
857            .finish();
858
859        let r = op.reader("retryable_error").await.unwrap();
860        let mut content = Vec::new();
861        let size = r
862            .read_into(&mut content, ..)
863            .await
864            .expect("read must succeed");
865        assert_eq!(size, 13);
866        assert_eq!(content, "Hello, World!".as_bytes());
867        // The error is retryable, we should request it 3 times.
868        assert_eq!(*builder.attempt.lock().unwrap(), 5);
869    }
870
871    /// This test is used to reproduce the panic issue while composing retry layer with timeout layer.
872    #[tokio::test]
873    async fn test_retry_write_fail_on_close() {
874        let _ = tracing_subscriber::fmt()
875            .with_max_level(LevelFilter::TRACE)
876            .with_test_writer()
877            .try_init();
878
879        let builder = MockBuilder::default();
880        let op = Operator::new(builder.clone())
881            .unwrap()
882            .layer(
883                RetryLayer::new()
884                    .with_min_delay(Duration::from_millis(1))
885                    .with_max_delay(Duration::from_millis(1))
886                    .with_jitter(),
887            )
888            // Uncomment this to reproduce timeout layer panic.
889            // .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
890            .layer(LoggingLayer::default())
891            .finish();
892
893        let mut w = op.writer("test_write").await.unwrap();
894        w.write("aaa").await.unwrap();
895        w.write("bbb").await.unwrap();
896        match w.close().await {
897            Ok(_) => (),
898            Err(_) => {
899                w.abort().await.unwrap();
900            }
901        };
902    }
903
904    #[tokio::test]
905    async fn test_retry_list() {
906        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
907
908        let builder = MockBuilder::default();
909        let op = Operator::new(builder.clone())
910            .unwrap()
911            .layer(RetryLayer::new())
912            .finish();
913
914        let expected = vec!["hello", "world", "2023/", "0208/"];
915
916        let mut lister = op
917            .lister("retryable_error/")
918            .await
919            .expect("service must support list");
920        let mut actual = Vec::new();
921        while let Some(obj) = lister.try_next().await.expect("must success") {
922            actual.push(obj.name().to_owned());
923        }
924
925        assert_eq!(actual, expected);
926    }
927
928    #[tokio::test]
929    async fn test_retry_batch() {
930        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
931
932        let builder = MockBuilder::default();
933        // set to a lower delay to make it run faster
934        let op = Operator::new(builder.clone())
935            .unwrap()
936            .layer(
937                RetryLayer::new()
938                    .with_min_delay(Duration::from_secs_f32(0.1))
939                    .with_max_times(5),
940            )
941            .finish();
942
943        let paths = vec!["hello", "world", "test", "batch"];
944        op.delete_stream(stream::iter(paths)).await.unwrap();
945        assert_eq!(*builder.attempt.lock().unwrap(), 5);
946    }
947}