opendal/layers/
retry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31/// Add retry for temporary failed operations.
32///
33/// # Notes
34///
35/// This layer will retry failed operations when [`Error::is_temporary`]
36/// returns true. If operation still failed, this layer will set error to
37/// `Persistent` which means error has been retried.
38///
39/// # Panics
40///
41/// While retrying `Reader` or `Writer` operations, please make sure either:
42///
43/// - All futures generated by `Reader::read` or `Writer::close` are resolved to `Ready`.
44/// - Or, won't call any `Reader` or `Writer` methods after retry returns final error.
45///
46/// Otherwise, `RetryLayer` could panic while hitting in bad states.
47///
48/// For example, while composing `RetryLayer` with `TimeoutLayer`. The order of layer is sensitive.
49///
50/// ```no_run
51/// # use std::time::Duration;
52///
53/// # use opendal::layers::RetryLayer;
54/// # use opendal::layers::TimeoutLayer;
55/// # use opendal::services;
56/// # use opendal::Operator;
57/// # use opendal::Result;
58///
59/// # fn main() -> Result<()> {
60/// let op = Operator::new(services::Memory::default())?
61///     // This is fine, since timeout happen during retry.
62///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
63///     .layer(RetryLayer::new())
64///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
65///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
66///     .finish();
67/// Ok(())
68/// # }
69/// ```
70///
71/// # Examples
72///
73/// ```no_run
74/// # use opendal::layers::RetryLayer;
75/// # use opendal::services;
76/// # use opendal::Operator;
77/// # use opendal::Result;
78/// # use opendal::Scheme;
79///
80/// # fn main() -> Result<()> {
81/// let _ = Operator::new(services::Memory::default())?
82///     .layer(RetryLayer::new())
83///     .finish();
84/// Ok(())
85/// # }
86/// ```
87///
88/// ## Customize retry interceptor
89///
90/// RetryLayer accepts [`RetryInterceptor`] to allow users to customize
91/// their own retry interceptor logic.
92///
93/// ```no_run
94/// # use std::time::Duration;
95///
96/// # use opendal::layers::RetryInterceptor;
97/// # use opendal::layers::RetryLayer;
98/// # use opendal::services;
99/// # use opendal::Error;
100/// # use opendal::Operator;
101/// # use opendal::Result;
102/// # use opendal::Scheme;
103///
104/// struct MyRetryInterceptor;
105///
106/// impl RetryInterceptor for MyRetryInterceptor {
107///     fn intercept(&self, err: &Error, dur: Duration) {
108///         // do something
109///     }
110/// }
111///
112/// # fn main() -> Result<()> {
113/// let _ = Operator::new(services::Memory::default())?
114///     .layer(RetryLayer::new().with_notify(MyRetryInterceptor))
115///     .finish();
116/// Ok(())
117/// # }
118/// ```
119pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
120    builder: ExponentialBuilder,
121    notify: Arc<I>,
122}
123
124impl<I: RetryInterceptor> Clone for RetryLayer<I> {
125    fn clone(&self) -> Self {
126        Self {
127            builder: self.builder,
128            notify: self.notify.clone(),
129        }
130    }
131}
132
133impl Default for RetryLayer {
134    fn default() -> Self {
135        Self {
136            builder: ExponentialBuilder::default(),
137            notify: Arc::new(DefaultRetryInterceptor),
138        }
139    }
140}
141
142impl RetryLayer {
143    /// Create a new retry layer.
144    /// # Examples
145    ///
146    /// ```no_run
147    /// use anyhow::Result;
148    /// use opendal::layers::RetryLayer;
149    /// use opendal::services;
150    /// use opendal::Operator;
151    /// use opendal::Scheme;
152    ///
153    /// let _ = Operator::new(services::Memory::default())
154    ///     .expect("must init")
155    ///     .layer(RetryLayer::new());
156    /// ```
157    pub fn new() -> RetryLayer {
158        Self::default()
159    }
160}
161
162impl<I: RetryInterceptor> RetryLayer<I> {
163    /// Set the retry interceptor as new notify.
164    ///
165    /// ```no_run
166    /// use opendal::layers::RetryLayer;
167    /// use opendal::services;
168    /// use opendal::Operator;
169    ///
170    /// fn notify(_err: &opendal::Error, _dur: std::time::Duration) {}
171    ///
172    /// let _ = Operator::new(services::Memory::default())
173    ///     .expect("must init")
174    ///     .layer(RetryLayer::new().with_notify(notify))
175    ///     .finish();
176    /// ```
177    pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
178        RetryLayer {
179            builder: self.builder,
180            notify: Arc::new(notify),
181        }
182    }
183
184    /// Set jitter of current backoff.
185    ///
186    /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay)
187    /// to current delay.
188    pub fn with_jitter(mut self) -> Self {
189        self.builder = self.builder.with_jitter();
190        self
191    }
192
193    /// Set factor of current backoff.
194    ///
195    /// # Panics
196    ///
197    /// This function will panic if input factor smaller than `1.0`.
198    pub fn with_factor(mut self, factor: f32) -> Self {
199        self.builder = self.builder.with_factor(factor);
200        self
201    }
202
203    /// Set min_delay of current backoff.
204    pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
205        self.builder = self.builder.with_min_delay(min_delay);
206        self
207    }
208
209    /// Set max_delay of current backoff.
210    ///
211    /// Delay will not increase if current delay is larger than max_delay.
212    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
213        self.builder = self.builder.with_max_delay(max_delay);
214        self
215    }
216
217    /// Set max_times of current backoff.
218    ///
219    /// Backoff will return `None` if max times is reaching.
220    pub fn with_max_times(mut self, max_times: usize) -> Self {
221        self.builder = self.builder.with_max_times(max_times);
222        self
223    }
224}
225
226impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
227    type LayeredAccess = RetryAccessor<A, I>;
228
229    fn layer(&self, inner: A) -> Self::LayeredAccess {
230        RetryAccessor {
231            inner: Arc::new(inner),
232            builder: self.builder,
233            notify: self.notify.clone(),
234        }
235    }
236}
237
238/// RetryInterceptor is used to intercept while retry happened.
239pub trait RetryInterceptor: Send + Sync + 'static {
240    /// Everytime RetryLayer is retrying, this function will be called.
241    ///
242    /// # Timing
243    ///
244    /// just before the retry sleep.
245    ///
246    /// # Inputs
247    ///
248    /// - err: The error that caused the current retry.
249    /// - dur: The duration that will sleep before next retry.
250    ///
251    /// # Notes
252    ///
253    /// The intercept must be quick and non-blocking. No heavy IO is
254    /// allowed. Otherwise, the retry will be blocked.
255    fn intercept(&self, err: &Error, dur: Duration);
256}
257
258impl<F> RetryInterceptor for F
259where
260    F: Fn(&Error, Duration) + Send + Sync + 'static,
261{
262    fn intercept(&self, err: &Error, dur: Duration) {
263        self(err, dur);
264    }
265}
266
267/// The DefaultRetryInterceptor will log the retry error in warning level.
268pub struct DefaultRetryInterceptor;
269
270impl RetryInterceptor for DefaultRetryInterceptor {
271    fn intercept(&self, err: &Error, dur: Duration) {
272        warn!(
273            target: "opendal::layers::retry",
274            "will retry after {}s because: {}",
275            dur.as_secs_f64(), err)
276    }
277}
278
279pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
280    inner: Arc<A>,
281    builder: ExponentialBuilder,
282    notify: Arc<I>,
283}
284
285impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
286    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("RetryAccessor")
288            .field("inner", &self.inner)
289            .finish_non_exhaustive()
290    }
291}
292
293impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
294    type Inner = A;
295    type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
296    type Writer = RetryWrapper<A::Writer, I>;
297    type Lister = RetryWrapper<A::Lister, I>;
298    type Deleter = RetryWrapper<A::Deleter, I>;
299
300    fn inner(&self) -> &Self::Inner {
301        &self.inner
302    }
303
304    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
305        { || self.inner.create_dir(path, args.clone()) }
306            .retry(self.builder)
307            .when(|e| e.is_temporary())
308            .notify(|err, dur: Duration| self.notify.intercept(err, dur))
309            .await
310            .map_err(|e| e.set_persistent())
311    }
312
313    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
314        let (rp, reader) = { || self.inner.read(path, args.clone()) }
315            .retry(self.builder)
316            .when(|e| e.is_temporary())
317            .notify(|err, dur| self.notify.intercept(err, dur))
318            .await
319            .map_err(|e| e.set_persistent())?;
320
321        let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
322        let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
323
324        Ok((rp, retry_wrapper))
325    }
326
327    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
328        { || self.inner.write(path, args.clone()) }
329            .retry(self.builder)
330            .when(|e| e.is_temporary())
331            .notify(|err, dur| self.notify.intercept(err, dur))
332            .await
333            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
334            .map_err(|e| e.set_persistent())
335    }
336
337    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
338        { || self.inner.stat(path, args.clone()) }
339            .retry(self.builder)
340            .when(|e| e.is_temporary())
341            .notify(|err, dur| self.notify.intercept(err, dur))
342            .await
343            .map_err(|e| e.set_persistent())
344    }
345
346    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
347        { || self.inner.delete() }
348            .retry(self.builder)
349            .when(|e| e.is_temporary())
350            .notify(|err, dur| self.notify.intercept(err, dur))
351            .await
352            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
353            .map_err(|e| e.set_persistent())
354    }
355
356    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
357        { || self.inner.copy(from, to, args.clone()) }
358            .retry(self.builder)
359            .when(|e| e.is_temporary())
360            .notify(|err, dur| self.notify.intercept(err, dur))
361            .await
362            .map_err(|e| e.set_persistent())
363    }
364
365    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
366        { || self.inner.rename(from, to, args.clone()) }
367            .retry(self.builder)
368            .when(|e| e.is_temporary())
369            .notify(|err, dur| self.notify.intercept(err, dur))
370            .await
371            .map_err(|e| e.set_persistent())
372    }
373
374    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
375        { || self.inner.list(path, args.clone()) }
376            .retry(self.builder)
377            .when(|e| e.is_temporary())
378            .notify(|err, dur| self.notify.intercept(err, dur))
379            .await
380            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
381            .map_err(|e| e.set_persistent())
382    }
383}
384
385pub struct RetryReader<A, R> {
386    inner: Arc<A>,
387    reader: Option<R>,
388
389    path: String,
390    args: OpRead,
391}
392
393impl<A, R> RetryReader<A, R> {
394    fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
395        Self {
396            inner,
397            reader: Some(r),
398
399            path,
400            args,
401        }
402    }
403}
404
405impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
406    async fn read(&mut self) -> Result<Buffer> {
407        loop {
408            match self.reader.take() {
409                None => {
410                    let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
411                    self.reader = Some(r);
412                    continue;
413                }
414                Some(mut reader) => {
415                    let buf = reader.read().await?;
416                    self.reader = Some(reader);
417                    self.args.range_mut().advance(buf.len() as u64);
418                    return Ok(buf);
419                }
420            }
421        }
422    }
423}
424
425pub struct RetryWrapper<R, I> {
426    inner: Option<R>,
427    notify: Arc<I>,
428
429    builder: ExponentialBuilder,
430}
431
432impl<R, I> RetryWrapper<R, I> {
433    fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
434        Self {
435            inner: Some(inner),
436            notify,
437            builder: backoff,
438        }
439    }
440
441    fn take_inner(&mut self) -> Result<R> {
442        self.inner.take().ok_or_else(|| {
443            Error::new(
444                ErrorKind::Unexpected,
445                "retry layer is in bad state, please make sure future not dropped before ready",
446            )
447        })
448    }
449}
450
451impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
452    async fn read(&mut self) -> Result<Buffer> {
453        use backon::RetryableWithContext;
454
455        let inner = self.take_inner()?;
456
457        let (inner, res) = {
458            |mut r: R| async move {
459                let res = r.read().await;
460
461                (r, res)
462            }
463        }
464        .retry(self.builder)
465        .when(|e| e.is_temporary())
466        .context(inner)
467        .notify(|err, dur| self.notify.intercept(err, dur))
468        .await;
469
470        self.inner = Some(inner);
471        res.map_err(|err| err.set_persistent())
472    }
473}
474
475impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
476    async fn write(&mut self, bs: Buffer) -> Result<()> {
477        use backon::RetryableWithContext;
478
479        let inner = self.take_inner()?;
480
481        let ((inner, _), res) = {
482            |(mut r, bs): (R, Buffer)| async move {
483                let res = r.write(bs.clone()).await;
484
485                ((r, bs), res)
486            }
487        }
488        .retry(self.builder)
489        .when(|e| e.is_temporary())
490        .context((inner, bs))
491        .notify(|err, dur| self.notify.intercept(err, dur))
492        .await;
493
494        self.inner = Some(inner);
495        res.map_err(|err| err.set_persistent())
496    }
497
498    async fn abort(&mut self) -> Result<()> {
499        use backon::RetryableWithContext;
500
501        let inner = self.take_inner()?;
502
503        let (inner, res) = {
504            |mut r: R| async move {
505                let res = r.abort().await;
506
507                (r, res)
508            }
509        }
510        .retry(self.builder)
511        .when(|e| e.is_temporary())
512        .context(inner)
513        .notify(|err, dur| self.notify.intercept(err, dur))
514        .await;
515
516        self.inner = Some(inner);
517        res.map_err(|err| err.set_persistent())
518    }
519
520    async fn close(&mut self) -> Result<Metadata> {
521        use backon::RetryableWithContext;
522
523        let inner = self.take_inner()?;
524
525        let (inner, res) = {
526            |mut r: R| async move {
527                let res = r.close().await;
528
529                (r, res)
530            }
531        }
532        .retry(self.builder)
533        .when(|e| e.is_temporary())
534        .context(inner)
535        .notify(|err, dur| self.notify.intercept(err, dur))
536        .await;
537
538        self.inner = Some(inner);
539        res.map_err(|err| err.set_persistent())
540    }
541}
542
543impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
544    async fn next(&mut self) -> Result<Option<oio::Entry>> {
545        use backon::RetryableWithContext;
546
547        let inner = self.take_inner()?;
548
549        let (inner, res) = {
550            |mut p: P| async move {
551                let res = p.next().await;
552
553                (p, res)
554            }
555        }
556        .retry(self.builder)
557        .when(|e| e.is_temporary())
558        .context(inner)
559        .notify(|err, dur| self.notify.intercept(err, dur))
560        .await;
561
562        self.inner = Some(inner);
563        res.map_err(|err| err.set_persistent())
564    }
565}
566
567impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
568    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
569        { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
570            .retry(self.builder)
571            .when(|e| e.is_temporary())
572            .notify(|err, dur| {
573                self.notify.intercept(err, dur);
574            })
575            .call()
576            .map_err(|e| e.set_persistent())
577    }
578
579    async fn flush(&mut self) -> Result<usize> {
580        use backon::RetryableWithContext;
581
582        let inner = self.take_inner()?;
583
584        let (inner, res) = {
585            |mut p: P| async move {
586                let res = p.flush().await;
587
588                (p, res)
589            }
590        }
591        .retry(self.builder)
592        .when(|e| e.is_temporary())
593        .context(inner)
594        .notify(|err, dur| self.notify.intercept(err, dur))
595        .await;
596
597        self.inner = Some(inner);
598        res.map_err(|err| err.set_persistent())
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use std::mem;
605    use std::sync::Arc;
606    use std::sync::Mutex;
607
608    use bytes::Bytes;
609    use futures::stream;
610    use futures::TryStreamExt;
611    use tracing_subscriber::filter::LevelFilter;
612
613    use super::*;
614    use crate::layers::LoggingLayer;
615
616    #[derive(Default, Clone)]
617    struct MockBuilder {
618        attempt: Arc<Mutex<usize>>,
619    }
620
621    impl Builder for MockBuilder {
622        const SCHEME: Scheme = Scheme::Custom("mock");
623        type Config = ();
624
625        fn build(self) -> Result<impl Access> {
626            Ok(MockService {
627                attempt: self.attempt.clone(),
628            })
629        }
630    }
631
632    #[derive(Debug, Clone, Default)]
633    struct MockService {
634        attempt: Arc<Mutex<usize>>,
635    }
636
637    impl Access for MockService {
638        type Reader = MockReader;
639        type Writer = MockWriter;
640        type Lister = MockLister;
641        type Deleter = MockDeleter;
642
643        fn info(&self) -> Arc<AccessorInfo> {
644            let am = AccessorInfo::default();
645            am.set_scheme(Scheme::Custom("mock"))
646                .set_native_capability(Capability {
647                    read: true,
648                    write: true,
649                    write_can_multi: true,
650                    delete: true,
651                    delete_max_size: Some(10),
652                    stat: true,
653                    list: true,
654                    list_with_recursive: true,
655                    ..Default::default()
656                });
657
658            am.into()
659        }
660
661        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
662            Ok(RpStat::new(
663                Metadata::new(EntryMode::FILE).with_content_length(13),
664            ))
665        }
666
667        async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
668            Ok((
669                RpRead::new(),
670                MockReader {
671                    buf: Bytes::from("Hello, World!").into(),
672                    range: args.range(),
673                    attempt: self.attempt.clone(),
674                },
675            ))
676        }
677
678        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
679            Ok((
680                RpDelete::default(),
681                MockDeleter {
682                    size: 0,
683                    attempt: self.attempt.clone(),
684                },
685            ))
686        }
687
688        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
689            Ok((RpWrite::new(), MockWriter {}))
690        }
691
692        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
693            let lister = MockLister::default();
694            Ok((RpList::default(), lister))
695        }
696    }
697
698    #[derive(Debug, Clone, Default)]
699    struct MockReader {
700        buf: Buffer,
701        range: BytesRange,
702        attempt: Arc<Mutex<usize>>,
703    }
704
705    impl oio::Read for MockReader {
706        async fn read(&mut self) -> Result<Buffer> {
707            let mut attempt = self.attempt.lock().unwrap();
708            *attempt += 1;
709
710            match *attempt {
711                1 => Err(
712                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
713                        .set_temporary(),
714                ),
715                2 => Err(
716                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
717                        .set_temporary(),
718                ),
719                // Should read out all data.
720                3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
721                4 => Err(
722                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
723                        .set_temporary(),
724                ),
725                // Should be empty.
726                5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
727                _ => unreachable!(),
728            }
729        }
730    }
731
732    #[derive(Debug, Clone, Default)]
733    struct MockWriter {}
734
735    impl oio::Write for MockWriter {
736        async fn write(&mut self, _: Buffer) -> Result<()> {
737            Ok(())
738        }
739
740        async fn close(&mut self) -> Result<Metadata> {
741            Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
742        }
743
744        async fn abort(&mut self) -> Result<()> {
745            Ok(())
746        }
747    }
748
749    #[derive(Debug, Clone, Default)]
750    struct MockLister {
751        attempt: usize,
752    }
753
754    impl oio::List for MockLister {
755        async fn next(&mut self) -> Result<Option<oio::Entry>> {
756            self.attempt += 1;
757            match self.attempt {
758                1 => Err(Error::new(
759                    ErrorKind::RateLimited,
760                    "retryable rate limited error from lister",
761                )
762                .set_temporary()),
763                2 => Ok(Some(oio::Entry::new(
764                    "hello",
765                    Metadata::new(EntryMode::FILE),
766                ))),
767                3 => Ok(Some(oio::Entry::new(
768                    "world",
769                    Metadata::new(EntryMode::FILE),
770                ))),
771                4 => Err(
772                    Error::new(ErrorKind::Unexpected, "retryable internal server error")
773                        .set_temporary(),
774                ),
775                5 => Ok(Some(oio::Entry::new(
776                    "2023/",
777                    Metadata::new(EntryMode::DIR),
778                ))),
779                6 => Ok(Some(oio::Entry::new(
780                    "0208/",
781                    Metadata::new(EntryMode::DIR),
782                ))),
783                7 => Ok(None),
784                _ => {
785                    unreachable!()
786                }
787            }
788        }
789    }
790
791    #[derive(Debug, Clone, Default)]
792    struct MockDeleter {
793        size: usize,
794        attempt: Arc<Mutex<usize>>,
795    }
796
797    impl oio::Delete for MockDeleter {
798        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
799            self.size += 1;
800            Ok(())
801        }
802
803        async fn flush(&mut self) -> Result<usize> {
804            let mut attempt = self.attempt.lock().unwrap();
805            *attempt += 1;
806
807            match *attempt {
808                1 => Err(
809                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
810                        .set_temporary(),
811                ),
812                2 => {
813                    self.size -= 1;
814                    Ok(1)
815                }
816                3 => Err(
817                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
818                        .set_temporary(),
819                ),
820                4 => Err(
821                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
822                        .set_temporary(),
823                ),
824                5 => {
825                    let s = mem::take(&mut self.size);
826                    Ok(s)
827                }
828                _ => unreachable!(),
829            }
830        }
831    }
832
833    #[tokio::test]
834    async fn test_retry_read() {
835        let _ = tracing_subscriber::fmt()
836            .with_max_level(LevelFilter::TRACE)
837            .with_test_writer()
838            .try_init();
839
840        let builder = MockBuilder::default();
841        let op = Operator::new(builder.clone())
842            .unwrap()
843            .layer(LoggingLayer::default())
844            .layer(RetryLayer::new())
845            .finish();
846
847        let r = op.reader("retryable_error").await.unwrap();
848        let mut content = Vec::new();
849        let size = r
850            .read_into(&mut content, ..)
851            .await
852            .expect("read must succeed");
853        assert_eq!(size, 13);
854        assert_eq!(content, "Hello, World!".as_bytes());
855        // The error is retryable, we should request it 3 times.
856        assert_eq!(*builder.attempt.lock().unwrap(), 5);
857    }
858
859    /// This test is used to reproduce the panic issue while composing retry layer with timeout layer.
860    #[tokio::test]
861    async fn test_retry_write_fail_on_close() {
862        let _ = tracing_subscriber::fmt()
863            .with_max_level(LevelFilter::TRACE)
864            .with_test_writer()
865            .try_init();
866
867        let builder = MockBuilder::default();
868        let op = Operator::new(builder.clone())
869            .unwrap()
870            .layer(
871                RetryLayer::new()
872                    .with_min_delay(Duration::from_millis(1))
873                    .with_max_delay(Duration::from_millis(1))
874                    .with_jitter(),
875            )
876            // Uncomment this to reproduce timeout layer panic.
877            // .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
878            .layer(LoggingLayer::default())
879            .finish();
880
881        let mut w = op.writer("test_write").await.unwrap();
882        w.write("aaa").await.unwrap();
883        w.write("bbb").await.unwrap();
884        match w.close().await {
885            Ok(_) => (),
886            Err(_) => {
887                w.abort().await.unwrap();
888            }
889        };
890    }
891
892    #[tokio::test]
893    async fn test_retry_list() {
894        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
895
896        let builder = MockBuilder::default();
897        let op = Operator::new(builder.clone())
898            .unwrap()
899            .layer(RetryLayer::new())
900            .finish();
901
902        let expected = vec!["hello", "world", "2023/", "0208/"];
903
904        let mut lister = op
905            .lister("retryable_error/")
906            .await
907            .expect("service must support list");
908        let mut actual = Vec::new();
909        while let Some(obj) = lister.try_next().await.expect("must success") {
910            actual.push(obj.name().to_owned());
911        }
912
913        assert_eq!(actual, expected);
914    }
915
916    #[tokio::test]
917    async fn test_retry_batch() {
918        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
919
920        let builder = MockBuilder::default();
921        // set to a lower delay to make it run faster
922        let op = Operator::new(builder.clone())
923            .unwrap()
924            .layer(
925                RetryLayer::new()
926                    .with_min_delay(Duration::from_secs_f32(0.1))
927                    .with_max_times(5),
928            )
929            .finish();
930
931        let paths = vec!["hello", "world", "test", "batch"];
932        op.delete_stream(stream::iter(paths)).await.unwrap();
933        assert_eq!(*builder.attempt.lock().unwrap(), 5);
934    }
935}