opendal/layers/
retry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31/// Add retry for temporary failed operations.
32///
33/// # Notes
34///
35/// This layer will retry failed operations when [`Error::is_temporary`]
36/// returns true. If operation still failed, this layer will set error to
37/// `Persistent` which means error has been retried.
38///
39/// # Panics
40///
41/// While retrying `Reader` or `Writer` operations, please make sure either:
42///
43/// - All futures generated by `Reader::read` or `Writer::close` are resolved to `Ready`.
44/// - Or, won't call any `Reader` or `Writer` methods after retry returns final error.
45///
46/// Otherwise, `RetryLayer` could panic while hitting in bad states.
47///
48/// For example, while composing `RetryLayer` with `TimeoutLayer`. The order of layer is sensitive.
49///
50/// ```no_run
51/// # use std::time::Duration;
52///
53/// # use opendal::layers::RetryLayer;
54/// # use opendal::layers::TimeoutLayer;
55/// # use opendal::services;
56/// # use opendal::Operator;
57/// # use opendal::Result;
58///
59/// # fn main() -> Result<()> {
60/// let op = Operator::new(services::Memory::default())?
61///     // This is fine, since timeout happen during retry.
62///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
63///     .layer(RetryLayer::new())
64///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
65///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
66///     .finish();
67/// Ok(())
68/// # }
69/// ```
70///
71/// # Examples
72///
73/// ```no_run
74/// # use opendal::layers::RetryLayer;
75/// # use opendal::services;
76/// # use opendal::Operator;
77/// # use opendal::Result;
78/// # use opendal::Scheme;
79///
80/// # fn main() -> Result<()> {
81/// let _ = Operator::new(services::Memory::default())?
82///     .layer(RetryLayer::new())
83///     .finish();
84/// Ok(())
85/// # }
86/// ```
87///
88/// ## Customize retry interceptor
89///
90/// RetryLayer accepts [`RetryInterceptor`] to allow users to customize
91/// their own retry interceptor logic.
92///
93/// ```no_run
94/// # use std::time::Duration;
95///
96/// # use opendal::layers::RetryInterceptor;
97/// # use opendal::layers::RetryLayer;
98/// # use opendal::services;
99/// # use opendal::Error;
100/// # use opendal::Operator;
101/// # use opendal::Result;
102/// # use opendal::Scheme;
103///
104/// struct MyRetryInterceptor;
105///
106/// impl RetryInterceptor for MyRetryInterceptor {
107///     fn intercept(&self, err: &Error, dur: Duration) {
108///         // do something
109///     }
110/// }
111///
112/// # fn main() -> Result<()> {
113/// let _ = Operator::new(services::Memory::default())?
114///     .layer(RetryLayer::new().with_notify(MyRetryInterceptor))
115///     .finish();
116/// Ok(())
117/// # }
118/// ```
119pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
120    builder: ExponentialBuilder,
121    notify: Arc<I>,
122}
123
124impl<I: RetryInterceptor> Clone for RetryLayer<I> {
125    fn clone(&self) -> Self {
126        Self {
127            builder: self.builder,
128            notify: self.notify.clone(),
129        }
130    }
131}
132
133impl Default for RetryLayer {
134    fn default() -> Self {
135        Self {
136            builder: ExponentialBuilder::default(),
137            notify: Arc::new(DefaultRetryInterceptor),
138        }
139    }
140}
141
142impl RetryLayer {
143    /// Create a new retry layer.
144    /// # Examples
145    ///
146    /// ```no_run
147    /// use anyhow::Result;
148    /// use opendal::layers::RetryLayer;
149    /// use opendal::services;
150    /// use opendal::Operator;
151    /// use opendal::Scheme;
152    ///
153    /// let _ = Operator::new(services::Memory::default())
154    ///     .expect("must init")
155    ///     .layer(RetryLayer::new());
156    /// ```
157    pub fn new() -> RetryLayer {
158        Self::default()
159    }
160}
161
162impl<I: RetryInterceptor> RetryLayer<I> {
163    /// Set the retry interceptor as new notify.
164    ///
165    /// ```no_run
166    /// use opendal::layers::RetryLayer;
167    /// use opendal::services;
168    /// use opendal::Operator;
169    ///
170    /// fn notify(_err: &opendal::Error, _dur: std::time::Duration) {}
171    ///
172    /// let _ = Operator::new(services::Memory::default())
173    ///     .expect("must init")
174    ///     .layer(RetryLayer::new().with_notify(notify))
175    ///     .finish();
176    /// ```
177    pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
178        RetryLayer {
179            builder: self.builder,
180            notify: Arc::new(notify),
181        }
182    }
183
184    /// Set jitter of current backoff.
185    ///
186    /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay)
187    /// to current delay.
188    pub fn with_jitter(mut self) -> Self {
189        self.builder = self.builder.with_jitter();
190        self
191    }
192
193    /// Set factor of current backoff.
194    ///
195    /// # Panics
196    ///
197    /// This function will panic if input factor smaller than `1.0`.
198    pub fn with_factor(mut self, factor: f32) -> Self {
199        self.builder = self.builder.with_factor(factor);
200        self
201    }
202
203    /// Set min_delay of current backoff.
204    pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
205        self.builder = self.builder.with_min_delay(min_delay);
206        self
207    }
208
209    /// Set max_delay of current backoff.
210    ///
211    /// Delay will not increase if current delay is larger than max_delay.
212    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
213        self.builder = self.builder.with_max_delay(max_delay);
214        self
215    }
216
217    /// Set max_times of current backoff.
218    ///
219    /// Backoff will return `None` if max times is reaching.
220    pub fn with_max_times(mut self, max_times: usize) -> Self {
221        self.builder = self.builder.with_max_times(max_times);
222        self
223    }
224}
225
226impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
227    type LayeredAccess = RetryAccessor<A, I>;
228
229    fn layer(&self, inner: A) -> Self::LayeredAccess {
230        RetryAccessor {
231            inner: Arc::new(inner),
232            builder: self.builder,
233            notify: self.notify.clone(),
234        }
235    }
236}
237
238/// RetryInterceptor is used to intercept while retry happened.
239pub trait RetryInterceptor: Send + Sync + 'static {
240    /// Everytime RetryLayer is retrying, this function will be called.
241    ///
242    /// # Timing
243    ///
244    /// just before the retry sleep.
245    ///
246    /// # Inputs
247    ///
248    /// - err: The error that caused the current retry.
249    /// - dur: The duration that will sleep before next retry.
250    ///
251    /// # Notes
252    ///
253    /// The intercept must be quick and non-blocking. No heavy IO is
254    /// allowed. Otherwise, the retry will be blocked.
255    fn intercept(&self, err: &Error, dur: Duration);
256}
257
258impl<F> RetryInterceptor for F
259where
260    F: Fn(&Error, Duration) + Send + Sync + 'static,
261{
262    fn intercept(&self, err: &Error, dur: Duration) {
263        self(err, dur);
264    }
265}
266
267/// The DefaultRetryInterceptor will log the retry error in warning level.
268pub struct DefaultRetryInterceptor;
269
270impl RetryInterceptor for DefaultRetryInterceptor {
271    fn intercept(&self, err: &Error, dur: Duration) {
272        warn!(
273            target: "opendal::layers::retry",
274            "will retry after {}s because: {}",
275            dur.as_secs_f64(), err)
276    }
277}
278
279pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
280    inner: Arc<A>,
281    builder: ExponentialBuilder,
282    notify: Arc<I>,
283}
284
285impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
286    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("RetryAccessor")
288            .field("inner", &self.inner)
289            .finish_non_exhaustive()
290    }
291}
292
293impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
294    type Inner = A;
295    type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
296    type Writer = RetryWrapper<A::Writer, I>;
297    type Lister = RetryWrapper<A::Lister, I>;
298    type Deleter = RetryWrapper<A::Deleter, I>;
299
300    fn inner(&self) -> &Self::Inner {
301        &self.inner
302    }
303
304    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
305        { || self.inner.create_dir(path, args.clone()) }
306            .retry(self.builder)
307            .when(|e| e.is_temporary())
308            .notify(|err, dur: Duration| self.notify.intercept(err, dur))
309            .await
310            .map_err(|e| e.set_persistent())
311    }
312
313    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
314        let (rp, reader) = { || self.inner.read(path, args.clone()) }
315            .retry(self.builder)
316            .when(|e| e.is_temporary())
317            .notify(|err, dur| self.notify.intercept(err, dur))
318            .await
319            .map_err(|e| e.set_persistent())?;
320
321        let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
322        let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
323
324        Ok((rp, retry_wrapper))
325    }
326
327    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
328        { || self.inner.write(path, args.clone()) }
329            .retry(self.builder)
330            .when(|e| e.is_temporary())
331            .notify(|err, dur| self.notify.intercept(err, dur))
332            .await
333            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
334            .map_err(|e| e.set_persistent())
335    }
336
337    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
338        { || self.inner.stat(path, args.clone()) }
339            .retry(self.builder)
340            .when(|e| e.is_temporary())
341            .notify(|err, dur| self.notify.intercept(err, dur))
342            .await
343            .map_err(|e| e.set_persistent())
344    }
345
346    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
347        { || self.inner.delete() }
348            .retry(self.builder)
349            .when(|e| e.is_temporary())
350            .notify(|err, dur| self.notify.intercept(err, dur))
351            .await
352            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
353            .map_err(|e| e.set_persistent())
354    }
355
356    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
357        { || self.inner.copy(from, to, args.clone()) }
358            .retry(self.builder)
359            .when(|e| e.is_temporary())
360            .notify(|err, dur| self.notify.intercept(err, dur))
361            .await
362            .map_err(|e| e.set_persistent())
363    }
364
365    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
366        { || self.inner.rename(from, to, args.clone()) }
367            .retry(self.builder)
368            .when(|e| e.is_temporary())
369            .notify(|err, dur| self.notify.intercept(err, dur))
370            .await
371            .map_err(|e| e.set_persistent())
372    }
373
374    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
375        { || self.inner.list(path, args.clone()) }
376            .retry(self.builder)
377            .when(|e| e.is_temporary())
378            .notify(|err, dur| self.notify.intercept(err, dur))
379            .await
380            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
381            .map_err(|e| e.set_persistent())
382    }
383}
384
385pub struct RetryReader<A, R> {
386    inner: Arc<A>,
387    reader: Option<R>,
388
389    path: String,
390    args: OpRead,
391}
392
393impl<A, R> RetryReader<A, R> {
394    fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
395        Self {
396            inner,
397            reader: Some(r),
398
399            path,
400            args,
401        }
402    }
403}
404
405impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
406    async fn read(&mut self) -> Result<Buffer> {
407        loop {
408            match self.reader.take() {
409                None => {
410                    let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
411                    self.reader = Some(r);
412                    continue;
413                }
414                Some(mut reader) => {
415                    let buf = reader.read().await?;
416                    self.reader = Some(reader);
417                    self.args.range_mut().advance(buf.len() as u64);
418                    return Ok(buf);
419                }
420            }
421        }
422    }
423}
424
425pub struct RetryWrapper<R, I> {
426    inner: Option<R>,
427    notify: Arc<I>,
428
429    builder: ExponentialBuilder,
430}
431
432impl<R, I> RetryWrapper<R, I> {
433    fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
434        Self {
435            inner: Some(inner),
436            notify,
437            builder: backoff,
438        }
439    }
440
441    fn take_inner(&mut self) -> Result<R> {
442        self.inner.take().ok_or_else(|| {
443            Error::new(
444                ErrorKind::Unexpected,
445                "retry layer is in bad state, please make sure future not dropped before ready",
446            )
447        })
448    }
449}
450
451impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
452    async fn read(&mut self) -> Result<Buffer> {
453        use backon::RetryableWithContext;
454
455        let inner = self.take_inner()?;
456
457        let (inner, res) = {
458            |mut r: R| async move {
459                let res = r.read().await;
460
461                (r, res)
462            }
463        }
464        .retry(self.builder)
465        .when(|e| e.is_temporary())
466        .context(inner)
467        .notify(|err, dur| self.notify.intercept(err, dur))
468        .await;
469
470        self.inner = Some(inner);
471        res.map_err(|err| err.set_persistent())
472    }
473}
474
475impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
476    async fn write(&mut self, bs: Buffer) -> Result<()> {
477        use backon::RetryableWithContext;
478
479        let inner = self.take_inner()?;
480
481        let ((inner, _), res) = {
482            |(mut r, bs): (R, Buffer)| async move {
483                let res = r.write(bs.clone()).await;
484
485                ((r, bs), res)
486            }
487        }
488        .retry(self.builder)
489        .when(|e| e.is_temporary())
490        .context((inner, bs))
491        .notify(|err, dur| self.notify.intercept(err, dur))
492        .await;
493
494        self.inner = Some(inner);
495        res.map_err(|err| err.set_persistent())
496    }
497
498    async fn abort(&mut self) -> Result<()> {
499        use backon::RetryableWithContext;
500
501        let inner = self.take_inner()?;
502
503        let (inner, res) = {
504            |mut r: R| async move {
505                let res = r.abort().await;
506
507                (r, res)
508            }
509        }
510        .retry(self.builder)
511        .when(|e| e.is_temporary())
512        .context(inner)
513        .notify(|err, dur| self.notify.intercept(err, dur))
514        .await;
515
516        self.inner = Some(inner);
517        res.map_err(|err| err.set_persistent())
518    }
519
520    async fn close(&mut self) -> Result<Metadata> {
521        use backon::RetryableWithContext;
522
523        let inner = self.take_inner()?;
524
525        let (inner, res) = {
526            |mut r: R| async move {
527                let res = r.close().await;
528
529                (r, res)
530            }
531        }
532        .retry(self.builder)
533        .when(|e| e.is_temporary())
534        .context(inner)
535        .notify(|err, dur| self.notify.intercept(err, dur))
536        .await;
537
538        self.inner = Some(inner);
539        res.map_err(|err| err.set_persistent())
540    }
541}
542
543impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
544    async fn next(&mut self) -> Result<Option<oio::Entry>> {
545        use backon::RetryableWithContext;
546
547        let inner = self.take_inner()?;
548
549        let (inner, res) = {
550            |mut p: P| async move {
551                let res = p.next().await;
552
553                (p, res)
554            }
555        }
556        .retry(self.builder)
557        .when(|e| e.is_temporary())
558        .context(inner)
559        .notify(|err, dur| self.notify.intercept(err, dur))
560        .await;
561
562        self.inner = Some(inner);
563        res.map_err(|err| err.set_persistent())
564    }
565}
566
567impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
568    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
569        { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
570            .retry(self.builder)
571            .when(|e| e.is_temporary())
572            .notify(|err, dur| {
573                self.notify.intercept(err, dur);
574            })
575            .call()
576            .map_err(|e| e.set_persistent())
577    }
578
579    async fn flush(&mut self) -> Result<usize> {
580        use backon::RetryableWithContext;
581
582        let inner = self.take_inner()?;
583
584        let (inner, res) = {
585            |mut p: P| async move {
586                let res = p.flush().await;
587
588                (p, res)
589            }
590        }
591        .retry(self.builder)
592        .when(|e| e.is_temporary())
593        .context(inner)
594        .notify(|err, dur| self.notify.intercept(err, dur))
595        .await;
596
597        self.inner = Some(inner);
598        res.map_err(|err| err.set_persistent())
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use std::mem;
605    use std::sync::Arc;
606    use std::sync::Mutex;
607
608    use bytes::Bytes;
609    use futures::{stream, TryStreamExt};
610    use tracing_subscriber::filter::LevelFilter;
611
612    use super::*;
613    use crate::layers::LoggingLayer;
614
615    #[derive(Default, Clone)]
616    struct MockBuilder {
617        attempt: Arc<Mutex<usize>>,
618    }
619
620    impl Builder for MockBuilder {
621        const SCHEME: Scheme = Scheme::Custom("mock");
622        type Config = ();
623
624        fn build(self) -> Result<impl Access> {
625            Ok(MockService {
626                attempt: self.attempt.clone(),
627            })
628        }
629    }
630
631    #[derive(Debug, Clone, Default)]
632    struct MockService {
633        attempt: Arc<Mutex<usize>>,
634    }
635
636    impl Access for MockService {
637        type Reader = MockReader;
638        type Writer = MockWriter;
639        type Lister = MockLister;
640        type Deleter = MockDeleter;
641
642        fn info(&self) -> Arc<AccessorInfo> {
643            let am = AccessorInfo::default();
644            am.set_scheme(Scheme::Custom("mock"))
645                .set_native_capability(Capability {
646                    read: true,
647                    write: true,
648                    write_can_multi: true,
649                    delete: true,
650                    delete_max_size: Some(10),
651                    stat: true,
652                    list: true,
653                    list_with_recursive: true,
654                    ..Default::default()
655                });
656
657            am.into()
658        }
659
660        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
661            Ok(RpStat::new(
662                Metadata::new(EntryMode::FILE).with_content_length(13),
663            ))
664        }
665
666        async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
667            Ok((
668                RpRead::new(),
669                MockReader {
670                    buf: Bytes::from("Hello, World!").into(),
671                    range: args.range(),
672                    attempt: self.attempt.clone(),
673                },
674            ))
675        }
676
677        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
678            Ok((
679                RpDelete::default(),
680                MockDeleter {
681                    size: 0,
682                    attempt: self.attempt.clone(),
683                },
684            ))
685        }
686
687        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
688            Ok((RpWrite::new(), MockWriter {}))
689        }
690
691        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
692            let lister = MockLister::default();
693            Ok((RpList::default(), lister))
694        }
695    }
696
697    #[derive(Debug, Clone, Default)]
698    struct MockReader {
699        buf: Buffer,
700        range: BytesRange,
701        attempt: Arc<Mutex<usize>>,
702    }
703
704    impl oio::Read for MockReader {
705        async fn read(&mut self) -> Result<Buffer> {
706            let mut attempt = self.attempt.lock().unwrap();
707            *attempt += 1;
708
709            match *attempt {
710                1 => Err(
711                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
712                        .set_temporary(),
713                ),
714                2 => Err(
715                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
716                        .set_temporary(),
717                ),
718                // Should read out all data.
719                3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
720                4 => Err(
721                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
722                        .set_temporary(),
723                ),
724                // Should be empty.
725                5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
726                _ => unreachable!(),
727            }
728        }
729    }
730
731    #[derive(Debug, Clone, Default)]
732    struct MockWriter {}
733
734    impl oio::Write for MockWriter {
735        async fn write(&mut self, _: Buffer) -> Result<()> {
736            Ok(())
737        }
738
739        async fn close(&mut self) -> Result<Metadata> {
740            Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
741        }
742
743        async fn abort(&mut self) -> Result<()> {
744            Ok(())
745        }
746    }
747
748    #[derive(Debug, Clone, Default)]
749    struct MockLister {
750        attempt: usize,
751    }
752
753    impl oio::List for MockLister {
754        async fn next(&mut self) -> Result<Option<oio::Entry>> {
755            self.attempt += 1;
756            match self.attempt {
757                1 => Err(Error::new(
758                    ErrorKind::RateLimited,
759                    "retryable rate limited error from lister",
760                )
761                .set_temporary()),
762                2 => Ok(Some(oio::Entry::new(
763                    "hello",
764                    Metadata::new(EntryMode::FILE),
765                ))),
766                3 => Ok(Some(oio::Entry::new(
767                    "world",
768                    Metadata::new(EntryMode::FILE),
769                ))),
770                4 => Err(
771                    Error::new(ErrorKind::Unexpected, "retryable internal server error")
772                        .set_temporary(),
773                ),
774                5 => Ok(Some(oio::Entry::new(
775                    "2023/",
776                    Metadata::new(EntryMode::DIR),
777                ))),
778                6 => Ok(Some(oio::Entry::new(
779                    "0208/",
780                    Metadata::new(EntryMode::DIR),
781                ))),
782                7 => Ok(None),
783                _ => {
784                    unreachable!()
785                }
786            }
787        }
788    }
789
790    #[derive(Debug, Clone, Default)]
791    struct MockDeleter {
792        size: usize,
793        attempt: Arc<Mutex<usize>>,
794    }
795
796    impl oio::Delete for MockDeleter {
797        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
798            self.size += 1;
799            Ok(())
800        }
801
802        async fn flush(&mut self) -> Result<usize> {
803            let mut attempt = self.attempt.lock().unwrap();
804            *attempt += 1;
805
806            match *attempt {
807                1 => Err(
808                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
809                        .set_temporary(),
810                ),
811                2 => {
812                    self.size -= 1;
813                    Ok(1)
814                }
815                3 => Err(
816                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
817                        .set_temporary(),
818                ),
819                4 => Err(
820                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
821                        .set_temporary(),
822                ),
823                5 => {
824                    let s = mem::take(&mut self.size);
825                    Ok(s)
826                }
827                _ => unreachable!(),
828            }
829        }
830    }
831
832    #[tokio::test]
833    async fn test_retry_read() {
834        let _ = tracing_subscriber::fmt()
835            .with_max_level(LevelFilter::TRACE)
836            .with_test_writer()
837            .try_init();
838
839        let builder = MockBuilder::default();
840        let op = Operator::new(builder.clone())
841            .unwrap()
842            .layer(LoggingLayer::default())
843            .layer(RetryLayer::new())
844            .finish();
845
846        let r = op.reader("retryable_error").await.unwrap();
847        let mut content = Vec::new();
848        let size = r
849            .read_into(&mut content, ..)
850            .await
851            .expect("read must succeed");
852        assert_eq!(size, 13);
853        assert_eq!(content, "Hello, World!".as_bytes());
854        // The error is retryable, we should request it 3 times.
855        assert_eq!(*builder.attempt.lock().unwrap(), 5);
856    }
857
858    /// This test is used to reproduce the panic issue while composing retry layer with timeout layer.
859    #[tokio::test]
860    async fn test_retry_write_fail_on_close() {
861        let _ = tracing_subscriber::fmt()
862            .with_max_level(LevelFilter::TRACE)
863            .with_test_writer()
864            .try_init();
865
866        let builder = MockBuilder::default();
867        let op = Operator::new(builder.clone())
868            .unwrap()
869            .layer(
870                RetryLayer::new()
871                    .with_min_delay(Duration::from_millis(1))
872                    .with_max_delay(Duration::from_millis(1))
873                    .with_jitter(),
874            )
875            // Uncomment this to reproduce timeout layer panic.
876            // .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
877            .layer(LoggingLayer::default())
878            .finish();
879
880        let mut w = op.writer("test_write").await.unwrap();
881        w.write("aaa").await.unwrap();
882        w.write("bbb").await.unwrap();
883        match w.close().await {
884            Ok(_) => (),
885            Err(_) => {
886                w.abort().await.unwrap();
887            }
888        };
889    }
890
891    #[tokio::test]
892    async fn test_retry_list() {
893        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
894
895        let builder = MockBuilder::default();
896        let op = Operator::new(builder.clone())
897            .unwrap()
898            .layer(RetryLayer::new())
899            .finish();
900
901        let expected = vec!["hello", "world", "2023/", "0208/"];
902
903        let mut lister = op
904            .lister("retryable_error/")
905            .await
906            .expect("service must support list");
907        let mut actual = Vec::new();
908        while let Some(obj) = lister.try_next().await.expect("must success") {
909            actual.push(obj.name().to_owned());
910        }
911
912        assert_eq!(actual, expected);
913    }
914
915    #[tokio::test]
916    async fn test_retry_batch() {
917        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
918
919        let builder = MockBuilder::default();
920        // set to a lower delay to make it run faster
921        let op = Operator::new(builder.clone())
922            .unwrap()
923            .layer(
924                RetryLayer::new()
925                    .with_min_delay(Duration::from_secs_f32(0.1))
926                    .with_max_times(5),
927            )
928            .finish();
929
930        let paths = vec!["hello", "world", "test", "batch"];
931        op.delete_stream(stream::iter(paths)).await.unwrap();
932        assert_eq!(*builder.attempt.lock().unwrap(), 5);
933    }
934}