opendal/layers/
retry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31/// Add retry for temporary failed operations.
32///
33/// # Notes
34///
35/// This layer will retry failed operations when [`Error::is_temporary`]
36/// returns true. If operation still failed, this layer will set error to
37/// `Persistent` which means error has been retried.
38///
39/// # Panics
40///
41/// While retrying `Reader` or `Writer` operations, please make sure either:
42///
43/// - All futures generated by `Reader::read` or `Writer::close` are resolved to `Ready`.
44/// - Or, won't call any `Reader` or `Writer` methods after retry returns final error.
45///
46/// Otherwise, `RetryLayer` could panic while hitting in bad states.
47///
48/// For example, while composing `RetryLayer` with `TimeoutLayer`. The order of layer is sensitive.
49///
50/// ```no_run
51/// # use std::time::Duration;
52///
53/// # use opendal::layers::RetryLayer;
54/// # use opendal::layers::TimeoutLayer;
55/// # use opendal::services;
56/// # use opendal::Operator;
57/// # use opendal::Result;
58///
59/// # fn main() -> Result<()> {
60/// let op = Operator::new(services::Memory::default())?
61///     // This is fine, since timeout happen during retry.
62///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
63///     .layer(RetryLayer::new())
64///     // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
65///     .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
66///     .finish();
67/// Ok(())
68/// # }
69/// ```
70///
71/// # Examples
72///
73/// ```no_run
74/// # use opendal::layers::RetryLayer;
75/// # use opendal::services;
76/// # use opendal::Operator;
77/// # use opendal::Result;
78/// # use opendal::Scheme;
79///
80/// # fn main() -> Result<()> {
81/// let _ = Operator::new(services::Memory::default())?
82///     .layer(RetryLayer::new())
83///     .finish();
84/// Ok(())
85/// # }
86/// ```
87///
88/// ## Customize retry interceptor
89///
90/// RetryLayer accepts [`RetryInterceptor`] to allow users to customize
91/// their own retry interceptor logic.
92///
93/// ```no_run
94/// # use std::time::Duration;
95///
96/// # use opendal::layers::RetryInterceptor;
97/// # use opendal::layers::RetryLayer;
98/// # use opendal::services;
99/// # use opendal::Error;
100/// # use opendal::Operator;
101/// # use opendal::Result;
102/// # use opendal::Scheme;
103///
104/// struct MyRetryInterceptor;
105///
106/// impl RetryInterceptor for MyRetryInterceptor {
107///     fn intercept(&self, err: &Error, dur: Duration) {
108///         // do something
109///     }
110/// }
111///
112/// # fn main() -> Result<()> {
113/// let _ = Operator::new(services::Memory::default())?
114///     .layer(RetryLayer::new().with_notify(MyRetryInterceptor))
115///     .finish();
116/// Ok(())
117/// # }
118/// ```
119pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
120    builder: ExponentialBuilder,
121    notify: Arc<I>,
122}
123
124impl<I: RetryInterceptor> Clone for RetryLayer<I> {
125    fn clone(&self) -> Self {
126        Self {
127            builder: self.builder,
128            notify: self.notify.clone(),
129        }
130    }
131}
132
133impl Default for RetryLayer {
134    fn default() -> Self {
135        Self {
136            builder: ExponentialBuilder::default(),
137            notify: Arc::new(DefaultRetryInterceptor),
138        }
139    }
140}
141
142impl RetryLayer {
143    /// Create a new retry layer.
144    /// # Examples
145    ///
146    /// ```no_run
147    /// use anyhow::Result;
148    /// use opendal::layers::RetryLayer;
149    /// use opendal::services;
150    /// use opendal::Operator;
151    /// use opendal::Scheme;
152    ///
153    /// let _ = Operator::new(services::Memory::default())
154    ///     .expect("must init")
155    ///     .layer(RetryLayer::new());
156    /// ```
157    pub fn new() -> RetryLayer {
158        Self::default()
159    }
160}
161
162impl<I: RetryInterceptor> RetryLayer<I> {
163    /// Set the retry interceptor as new notify.
164    ///
165    /// ```no_run
166    /// use opendal::layers::RetryLayer;
167    /// use opendal::services;
168    /// use opendal::Operator;
169    ///
170    /// fn notify(_err: &opendal::Error, _dur: std::time::Duration) {}
171    ///
172    /// let _ = Operator::new(services::Memory::default())
173    ///     .expect("must init")
174    ///     .layer(RetryLayer::new().with_notify(notify))
175    ///     .finish();
176    /// ```
177    pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
178        RetryLayer {
179            builder: self.builder,
180            notify: Arc::new(notify),
181        }
182    }
183
184    /// Set jitter of current backoff.
185    ///
186    /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay)
187    /// to current delay.
188    pub fn with_jitter(mut self) -> Self {
189        self.builder = self.builder.with_jitter();
190        self
191    }
192
193    /// Set factor of current backoff.
194    ///
195    /// # Panics
196    ///
197    /// This function will panic if input factor smaller than `1.0`.
198    pub fn with_factor(mut self, factor: f32) -> Self {
199        self.builder = self.builder.with_factor(factor);
200        self
201    }
202
203    /// Set min_delay of current backoff.
204    pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
205        self.builder = self.builder.with_min_delay(min_delay);
206        self
207    }
208
209    /// Set max_delay of current backoff.
210    ///
211    /// Delay will not increase if current delay is larger than max_delay.
212    pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
213        self.builder = self.builder.with_max_delay(max_delay);
214        self
215    }
216
217    /// Set max_times of current backoff.
218    ///
219    /// Backoff will return `None` if max times is reaching.
220    pub fn with_max_times(mut self, max_times: usize) -> Self {
221        self.builder = self.builder.with_max_times(max_times);
222        self
223    }
224}
225
226impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
227    type LayeredAccess = RetryAccessor<A, I>;
228
229    fn layer(&self, inner: A) -> Self::LayeredAccess {
230        RetryAccessor {
231            inner: Arc::new(inner),
232            builder: self.builder,
233            notify: self.notify.clone(),
234        }
235    }
236}
237
238/// RetryInterceptor is used to intercept while retry happened.
239pub trait RetryInterceptor: Send + Sync + 'static {
240    /// Everytime RetryLayer is retrying, this function will be called.
241    ///
242    /// # Timing
243    ///
244    /// just before the retry sleep.
245    ///
246    /// # Inputs
247    ///
248    /// - err: The error that caused the current retry.
249    /// - dur: The duration that will sleep before next retry.
250    ///
251    /// # Notes
252    ///
253    /// The intercept must be quick and non-blocking. No heavy IO is
254    /// allowed. Otherwise, the retry will be blocked.
255    fn intercept(&self, err: &Error, dur: Duration);
256}
257
258impl<F> RetryInterceptor for F
259where
260    F: Fn(&Error, Duration) + Send + Sync + 'static,
261{
262    fn intercept(&self, err: &Error, dur: Duration) {
263        self(err, dur);
264    }
265}
266
267/// The DefaultRetryInterceptor will log the retry error in warning level.
268pub struct DefaultRetryInterceptor;
269
270impl RetryInterceptor for DefaultRetryInterceptor {
271    fn intercept(&self, err: &Error, dur: Duration) {
272        warn!(
273            target: "opendal::layers::retry",
274            "will retry after {}s because: {}",
275            dur.as_secs_f64(), err)
276    }
277}
278
279pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
280    inner: Arc<A>,
281    builder: ExponentialBuilder,
282    notify: Arc<I>,
283}
284
285impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
286    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("RetryAccessor")
288            .field("inner", &self.inner)
289            .finish_non_exhaustive()
290    }
291}
292
293impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
294    type Inner = A;
295    type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
296    type BlockingReader = RetryWrapper<RetryReader<A, A::BlockingReader>, I>;
297    type Writer = RetryWrapper<A::Writer, I>;
298    type BlockingWriter = RetryWrapper<A::BlockingWriter, I>;
299    type Lister = RetryWrapper<A::Lister, I>;
300    type BlockingLister = RetryWrapper<A::BlockingLister, I>;
301    type Deleter = RetryWrapper<A::Deleter, I>;
302    type BlockingDeleter = RetryWrapper<A::BlockingDeleter, I>;
303
304    fn inner(&self) -> &Self::Inner {
305        &self.inner
306    }
307
308    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
309        { || self.inner.create_dir(path, args.clone()) }
310            .retry(self.builder)
311            .when(|e| e.is_temporary())
312            .notify(|err, dur: Duration| self.notify.intercept(err, dur))
313            .await
314            .map_err(|e| e.set_persistent())
315    }
316
317    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
318        let (rp, reader) = { || self.inner.read(path, args.clone()) }
319            .retry(self.builder)
320            .when(|e| e.is_temporary())
321            .notify(|err, dur| self.notify.intercept(err, dur))
322            .await
323            .map_err(|e| e.set_persistent())?;
324
325        let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
326        let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
327
328        Ok((rp, retry_wrapper))
329    }
330
331    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
332        { || self.inner.write(path, args.clone()) }
333            .retry(self.builder)
334            .when(|e| e.is_temporary())
335            .notify(|err, dur| self.notify.intercept(err, dur))
336            .await
337            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
338            .map_err(|e| e.set_persistent())
339    }
340
341    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
342        { || self.inner.stat(path, args.clone()) }
343            .retry(self.builder)
344            .when(|e| e.is_temporary())
345            .notify(|err, dur| self.notify.intercept(err, dur))
346            .await
347            .map_err(|e| e.set_persistent())
348    }
349
350    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
351        { || self.inner.delete() }
352            .retry(self.builder)
353            .when(|e| e.is_temporary())
354            .notify(|err, dur| self.notify.intercept(err, dur))
355            .await
356            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
357            .map_err(|e| e.set_persistent())
358    }
359
360    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
361        { || self.inner.copy(from, to, args.clone()) }
362            .retry(self.builder)
363            .when(|e| e.is_temporary())
364            .notify(|err, dur| self.notify.intercept(err, dur))
365            .await
366            .map_err(|e| e.set_persistent())
367    }
368
369    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
370        { || self.inner.rename(from, to, args.clone()) }
371            .retry(self.builder)
372            .when(|e| e.is_temporary())
373            .notify(|err, dur| self.notify.intercept(err, dur))
374            .await
375            .map_err(|e| e.set_persistent())
376    }
377
378    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
379        { || self.inner.list(path, args.clone()) }
380            .retry(self.builder)
381            .when(|e| e.is_temporary())
382            .notify(|err, dur| self.notify.intercept(err, dur))
383            .await
384            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
385            .map_err(|e| e.set_persistent())
386    }
387
388    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
389        { || self.inner.blocking_create_dir(path, args.clone()) }
390            .retry(self.builder)
391            .when(|e| e.is_temporary())
392            .notify(|err, dur| self.notify.intercept(err, dur))
393            .call()
394            .map_err(|e| e.set_persistent())
395    }
396
397    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
398        let (rp, reader) = { || self.inner.blocking_read(path, args.clone()) }
399            .retry(self.builder)
400            .when(|e| e.is_temporary())
401            .notify(|err, dur| self.notify.intercept(err, dur))
402            .call()
403            .map_err(|e| e.set_persistent())?;
404
405        let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
406        let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
407
408        Ok((rp, retry_wrapper))
409    }
410
411    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
412        { || self.inner.blocking_write(path, args.clone()) }
413            .retry(self.builder)
414            .when(|e| e.is_temporary())
415            .notify(|err, dur| self.notify.intercept(err, dur))
416            .call()
417            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
418            .map_err(|e| e.set_persistent())
419    }
420
421    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
422        { || self.inner.blocking_stat(path, args.clone()) }
423            .retry(self.builder)
424            .when(|e| e.is_temporary())
425            .notify(|err, dur| self.notify.intercept(err, dur))
426            .call()
427            .map_err(|e| e.set_persistent())
428    }
429
430    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
431        { || self.inner.blocking_delete() }
432            .retry(self.builder)
433            .when(|e| e.is_temporary())
434            .notify(|err, dur| self.notify.intercept(err, dur))
435            .call()
436            .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
437            .map_err(|e| e.set_persistent())
438    }
439
440    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
441        { || self.inner.blocking_copy(from, to, args.clone()) }
442            .retry(self.builder)
443            .when(|e| e.is_temporary())
444            .notify(|err, dur| self.notify.intercept(err, dur))
445            .call()
446            .map_err(|e| e.set_persistent())
447    }
448
449    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
450        { || self.inner.blocking_rename(from, to, args.clone()) }
451            .retry(self.builder)
452            .when(|e| e.is_temporary())
453            .notify(|err, dur| self.notify.intercept(err, dur))
454            .call()
455            .map_err(|e| e.set_persistent())
456    }
457
458    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
459        { || self.inner.blocking_list(path, args.clone()) }
460            .retry(self.builder)
461            .when(|e| e.is_temporary())
462            .notify(|err, dur| self.notify.intercept(err, dur))
463            .call()
464            .map(|(rp, p)| {
465                let p = RetryWrapper::new(p, self.notify.clone(), self.builder);
466                (rp, p)
467            })
468            .map_err(|e| e.set_persistent())
469    }
470}
471
472pub struct RetryReader<A, R> {
473    inner: Arc<A>,
474    reader: Option<R>,
475
476    path: String,
477    args: OpRead,
478}
479
480impl<A, R> RetryReader<A, R> {
481    fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
482        Self {
483            inner,
484            reader: Some(r),
485
486            path,
487            args,
488        }
489    }
490}
491
492impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
493    async fn read(&mut self) -> Result<Buffer> {
494        loop {
495            match self.reader.take() {
496                None => {
497                    let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
498                    self.reader = Some(r);
499                    continue;
500                }
501                Some(mut reader) => {
502                    let buf = reader.read().await?;
503                    self.reader = Some(reader);
504                    self.args.range_mut().advance(buf.len() as u64);
505                    return Ok(buf);
506                }
507            }
508        }
509    }
510}
511
512impl<A: Access> oio::BlockingRead for RetryReader<A, A::BlockingReader> {
513    fn read(&mut self) -> Result<Buffer> {
514        loop {
515            match self.reader.take() {
516                None => {
517                    let (_, r) = self.inner.blocking_read(&self.path, self.args.clone())?;
518                    self.reader = Some(r);
519                    continue;
520                }
521                Some(mut reader) => {
522                    let buf = reader.read()?;
523                    self.reader = Some(reader);
524                    self.args.range_mut().advance(buf.len() as u64);
525                    return Ok(buf);
526                }
527            }
528        }
529    }
530}
531
532pub struct RetryWrapper<R, I> {
533    inner: Option<R>,
534    notify: Arc<I>,
535
536    builder: ExponentialBuilder,
537}
538
539impl<R, I> RetryWrapper<R, I> {
540    fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
541        Self {
542            inner: Some(inner),
543            notify,
544            builder: backoff,
545        }
546    }
547
548    fn take_inner(&mut self) -> Result<R> {
549        self.inner.take().ok_or_else(|| {
550            Error::new(
551                ErrorKind::Unexpected,
552                "retry layer is in bad state, please make sure future not dropped before ready",
553            )
554        })
555    }
556}
557
558impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
559    async fn read(&mut self) -> Result<Buffer> {
560        use backon::RetryableWithContext;
561
562        let inner = self.take_inner()?;
563
564        let (inner, res) = {
565            |mut r: R| async move {
566                let res = r.read().await;
567
568                (r, res)
569            }
570        }
571        .retry(self.builder)
572        .when(|e| e.is_temporary())
573        .context(inner)
574        .notify(|err, dur| self.notify.intercept(err, dur))
575        .await;
576
577        self.inner = Some(inner);
578        res.map_err(|err| err.set_persistent())
579    }
580}
581
582impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapper<R, I> {
583    fn read(&mut self) -> Result<Buffer> {
584        use backon::BlockingRetryableWithContext;
585
586        let inner = self.take_inner()?;
587
588        let (inner, res) = {
589            |mut r: R| {
590                let res = r.read();
591
592                (r, res)
593            }
594        }
595        .retry(self.builder)
596        .when(|e| e.is_temporary())
597        .context(inner)
598        .notify(|err, dur| self.notify.intercept(err, dur))
599        .call();
600
601        self.inner = Some(inner);
602        res.map_err(|err| err.set_persistent())
603    }
604}
605
606impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
607    async fn write(&mut self, bs: Buffer) -> Result<()> {
608        use backon::RetryableWithContext;
609
610        let inner = self.take_inner()?;
611
612        let ((inner, _), res) = {
613            |(mut r, bs): (R, Buffer)| async move {
614                let res = r.write(bs.clone()).await;
615
616                ((r, bs), res)
617            }
618        }
619        .retry(self.builder)
620        .when(|e| e.is_temporary())
621        .context((inner, bs))
622        .notify(|err, dur| self.notify.intercept(err, dur))
623        .await;
624
625        self.inner = Some(inner);
626        res.map_err(|err| err.set_persistent())
627    }
628
629    async fn abort(&mut self) -> Result<()> {
630        use backon::RetryableWithContext;
631
632        let inner = self.take_inner()?;
633
634        let (inner, res) = {
635            |mut r: R| async move {
636                let res = r.abort().await;
637
638                (r, res)
639            }
640        }
641        .retry(self.builder)
642        .when(|e| e.is_temporary())
643        .context(inner)
644        .notify(|err, dur| self.notify.intercept(err, dur))
645        .await;
646
647        self.inner = Some(inner);
648        res.map_err(|err| err.set_persistent())
649    }
650
651    async fn close(&mut self) -> Result<Metadata> {
652        use backon::RetryableWithContext;
653
654        let inner = self.take_inner()?;
655
656        let (inner, res) = {
657            |mut r: R| async move {
658                let res = r.close().await;
659
660                (r, res)
661            }
662        }
663        .retry(self.builder)
664        .when(|e| e.is_temporary())
665        .context(inner)
666        .notify(|err, dur| self.notify.intercept(err, dur))
667        .await;
668
669        self.inner = Some(inner);
670        res.map_err(|err| err.set_persistent())
671    }
672}
673
674impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> {
675    fn write(&mut self, bs: Buffer) -> Result<()> {
676        { || self.inner.as_mut().unwrap().write(bs.clone()) }
677            .retry(self.builder)
678            .when(|e| e.is_temporary())
679            .notify(|err, dur| {
680                self.notify.intercept(err, dur);
681            })
682            .call()
683            .map_err(|e| e.set_persistent())
684    }
685
686    fn close(&mut self) -> Result<Metadata> {
687        { || self.inner.as_mut().unwrap().close() }
688            .retry(self.builder)
689            .when(|e| e.is_temporary())
690            .notify(|err, dur| {
691                self.notify.intercept(err, dur);
692            })
693            .call()
694            .map_err(|e| e.set_persistent())
695    }
696}
697
698impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
699    async fn next(&mut self) -> Result<Option<oio::Entry>> {
700        use backon::RetryableWithContext;
701
702        let inner = self.take_inner()?;
703
704        let (inner, res) = {
705            |mut p: P| async move {
706                let res = p.next().await;
707
708                (p, res)
709            }
710        }
711        .retry(self.builder)
712        .when(|e| e.is_temporary())
713        .context(inner)
714        .notify(|err, dur| self.notify.intercept(err, dur))
715        .await;
716
717        self.inner = Some(inner);
718        res.map_err(|err| err.set_persistent())
719    }
720}
721
722impl<P: oio::BlockingList, I: RetryInterceptor> oio::BlockingList for RetryWrapper<P, I> {
723    fn next(&mut self) -> Result<Option<oio::Entry>> {
724        { || self.inner.as_mut().unwrap().next() }
725            .retry(self.builder)
726            .when(|e| e.is_temporary())
727            .notify(|err, dur| {
728                self.notify.intercept(err, dur);
729            })
730            .call()
731            .map_err(|e| e.set_persistent())
732    }
733}
734
735impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
736    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
737        { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
738            .retry(self.builder)
739            .when(|e| e.is_temporary())
740            .notify(|err, dur| {
741                self.notify.intercept(err, dur);
742            })
743            .call()
744            .map_err(|e| e.set_persistent())
745    }
746
747    async fn flush(&mut self) -> Result<usize> {
748        use backon::RetryableWithContext;
749
750        let inner = self.take_inner()?;
751
752        let (inner, res) = {
753            |mut p: P| async move {
754                let res = p.flush().await;
755
756                (p, res)
757            }
758        }
759        .retry(self.builder)
760        .when(|e| e.is_temporary())
761        .context(inner)
762        .notify(|err, dur| self.notify.intercept(err, dur))
763        .await;
764
765        self.inner = Some(inner);
766        res.map_err(|err| err.set_persistent())
767    }
768}
769
770impl<P: oio::BlockingDelete, I: RetryInterceptor> oio::BlockingDelete for RetryWrapper<P, I> {
771    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
772        { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
773            .retry(self.builder)
774            .when(|e| e.is_temporary())
775            .notify(|err, dur| {
776                self.notify.intercept(err, dur);
777            })
778            .call()
779            .map_err(|e| e.set_persistent())
780    }
781
782    fn flush(&mut self) -> Result<usize> {
783        { || self.inner.as_mut().unwrap().flush() }
784            .retry(self.builder)
785            .when(|e| e.is_temporary())
786            .notify(|err, dur| {
787                self.notify.intercept(err, dur);
788            })
789            .call()
790            .map_err(|e| e.set_persistent())
791    }
792}
793
794#[cfg(test)]
795mod tests {
796    use std::mem;
797    use std::sync::Arc;
798    use std::sync::Mutex;
799
800    use bytes::Bytes;
801    use futures::{stream, TryStreamExt};
802    use tracing_subscriber::filter::LevelFilter;
803
804    use super::*;
805    use crate::layers::LoggingLayer;
806
807    #[derive(Default, Clone)]
808    struct MockBuilder {
809        attempt: Arc<Mutex<usize>>,
810    }
811
812    impl Builder for MockBuilder {
813        const SCHEME: Scheme = Scheme::Custom("mock");
814        type Config = ();
815
816        fn build(self) -> Result<impl Access> {
817            Ok(MockService {
818                attempt: self.attempt.clone(),
819            })
820        }
821    }
822
823    #[derive(Debug, Clone, Default)]
824    struct MockService {
825        attempt: Arc<Mutex<usize>>,
826    }
827
828    impl Access for MockService {
829        type Reader = MockReader;
830        type Writer = MockWriter;
831        type Lister = MockLister;
832        type Deleter = MockDeleter;
833        type BlockingReader = ();
834        type BlockingWriter = ();
835        type BlockingLister = ();
836        type BlockingDeleter = ();
837
838        fn info(&self) -> Arc<AccessorInfo> {
839            let am = AccessorInfo::default();
840            am.set_scheme(Scheme::Custom("mock"))
841                .set_native_capability(Capability {
842                    read: true,
843                    write: true,
844                    write_can_multi: true,
845                    delete: true,
846                    delete_max_size: Some(10),
847                    stat: true,
848                    list: true,
849                    list_with_recursive: true,
850                    ..Default::default()
851                });
852
853            am.into()
854        }
855
856        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
857            Ok(RpStat::new(
858                Metadata::new(EntryMode::FILE).with_content_length(13),
859            ))
860        }
861
862        async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
863            Ok((
864                RpRead::new(),
865                MockReader {
866                    buf: Bytes::from("Hello, World!").into(),
867                    range: args.range(),
868                    attempt: self.attempt.clone(),
869                },
870            ))
871        }
872
873        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
874            Ok((
875                RpDelete::default(),
876                MockDeleter {
877                    size: 0,
878                    attempt: self.attempt.clone(),
879                },
880            ))
881        }
882
883        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
884            Ok((RpWrite::new(), MockWriter {}))
885        }
886
887        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
888            let lister = MockLister::default();
889            Ok((RpList::default(), lister))
890        }
891    }
892
893    #[derive(Debug, Clone, Default)]
894    struct MockReader {
895        buf: Buffer,
896        range: BytesRange,
897        attempt: Arc<Mutex<usize>>,
898    }
899
900    impl oio::Read for MockReader {
901        async fn read(&mut self) -> Result<Buffer> {
902            let mut attempt = self.attempt.lock().unwrap();
903            *attempt += 1;
904
905            match *attempt {
906                1 => Err(
907                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
908                        .set_temporary(),
909                ),
910                2 => Err(
911                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
912                        .set_temporary(),
913                ),
914                // Should read out all data.
915                3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
916                4 => Err(
917                    Error::new(ErrorKind::Unexpected, "retryable_error from reader")
918                        .set_temporary(),
919                ),
920                // Should be empty.
921                5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
922                _ => unreachable!(),
923            }
924        }
925    }
926
927    #[derive(Debug, Clone, Default)]
928    struct MockWriter {}
929
930    impl oio::Write for MockWriter {
931        async fn write(&mut self, _: Buffer) -> Result<()> {
932            Ok(())
933        }
934
935        async fn close(&mut self) -> Result<Metadata> {
936            Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
937        }
938
939        async fn abort(&mut self) -> Result<()> {
940            Ok(())
941        }
942    }
943
944    #[derive(Debug, Clone, Default)]
945    struct MockLister {
946        attempt: usize,
947    }
948
949    impl oio::List for MockLister {
950        async fn next(&mut self) -> Result<Option<oio::Entry>> {
951            self.attempt += 1;
952            match self.attempt {
953                1 => Err(Error::new(
954                    ErrorKind::RateLimited,
955                    "retryable rate limited error from lister",
956                )
957                .set_temporary()),
958                2 => Ok(Some(oio::Entry::new(
959                    "hello",
960                    Metadata::new(EntryMode::FILE),
961                ))),
962                3 => Ok(Some(oio::Entry::new(
963                    "world",
964                    Metadata::new(EntryMode::FILE),
965                ))),
966                4 => Err(
967                    Error::new(ErrorKind::Unexpected, "retryable internal server error")
968                        .set_temporary(),
969                ),
970                5 => Ok(Some(oio::Entry::new(
971                    "2023/",
972                    Metadata::new(EntryMode::DIR),
973                ))),
974                6 => Ok(Some(oio::Entry::new(
975                    "0208/",
976                    Metadata::new(EntryMode::DIR),
977                ))),
978                7 => Ok(None),
979                _ => {
980                    unreachable!()
981                }
982            }
983        }
984    }
985
986    #[derive(Debug, Clone, Default)]
987    struct MockDeleter {
988        size: usize,
989        attempt: Arc<Mutex<usize>>,
990    }
991
992    impl oio::Delete for MockDeleter {
993        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
994            self.size += 1;
995            Ok(())
996        }
997
998        async fn flush(&mut self) -> Result<usize> {
999            let mut attempt = self.attempt.lock().unwrap();
1000            *attempt += 1;
1001
1002            match *attempt {
1003                1 => Err(
1004                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
1005                        .set_temporary(),
1006                ),
1007                2 => {
1008                    self.size -= 1;
1009                    Ok(1)
1010                }
1011                3 => Err(
1012                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
1013                        .set_temporary(),
1014                ),
1015                4 => Err(
1016                    Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
1017                        .set_temporary(),
1018                ),
1019                5 => {
1020                    let s = mem::take(&mut self.size);
1021                    Ok(s)
1022                }
1023                _ => unreachable!(),
1024            }
1025        }
1026    }
1027
1028    #[tokio::test]
1029    async fn test_retry_read() {
1030        let _ = tracing_subscriber::fmt()
1031            .with_max_level(LevelFilter::TRACE)
1032            .with_test_writer()
1033            .try_init();
1034
1035        let builder = MockBuilder::default();
1036        let op = Operator::new(builder.clone())
1037            .unwrap()
1038            .layer(LoggingLayer::default())
1039            .layer(RetryLayer::new())
1040            .finish();
1041
1042        let r = op.reader("retryable_error").await.unwrap();
1043        let mut content = Vec::new();
1044        let size = r
1045            .read_into(&mut content, ..)
1046            .await
1047            .expect("read must succeed");
1048        assert_eq!(size, 13);
1049        assert_eq!(content, "Hello, World!".as_bytes());
1050        // The error is retryable, we should request it 3 times.
1051        assert_eq!(*builder.attempt.lock().unwrap(), 5);
1052    }
1053
1054    /// This test is used to reproduce the panic issue while composing retry layer with timeout layer.
1055    #[tokio::test]
1056    async fn test_retry_write_fail_on_close() {
1057        let _ = tracing_subscriber::fmt()
1058            .with_max_level(LevelFilter::TRACE)
1059            .with_test_writer()
1060            .try_init();
1061
1062        let builder = MockBuilder::default();
1063        let op = Operator::new(builder.clone())
1064            .unwrap()
1065            .layer(
1066                RetryLayer::new()
1067                    .with_min_delay(Duration::from_millis(1))
1068                    .with_max_delay(Duration::from_millis(1))
1069                    .with_jitter(),
1070            )
1071            // Uncomment this to reproduce timeout layer panic.
1072            // .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1)))
1073            .layer(LoggingLayer::default())
1074            .finish();
1075
1076        let mut w = op.writer("test_write").await.unwrap();
1077        w.write("aaa").await.unwrap();
1078        w.write("bbb").await.unwrap();
1079        match w.close().await {
1080            Ok(_) => (),
1081            Err(_) => {
1082                w.abort().await.unwrap();
1083            }
1084        };
1085    }
1086
1087    #[tokio::test]
1088    async fn test_retry_list() {
1089        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1090
1091        let builder = MockBuilder::default();
1092        let op = Operator::new(builder.clone())
1093            .unwrap()
1094            .layer(RetryLayer::new())
1095            .finish();
1096
1097        let expected = vec!["hello", "world", "2023/", "0208/"];
1098
1099        let mut lister = op
1100            .lister("retryable_error/")
1101            .await
1102            .expect("service must support list");
1103        let mut actual = Vec::new();
1104        while let Some(obj) = lister.try_next().await.expect("must success") {
1105            actual.push(obj.name().to_owned());
1106        }
1107
1108        assert_eq!(actual, expected);
1109    }
1110
1111    #[tokio::test]
1112    async fn test_retry_batch() {
1113        let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1114
1115        let builder = MockBuilder::default();
1116        // set to a lower delay to make it run faster
1117        let op = Operator::new(builder.clone())
1118            .unwrap()
1119            .layer(
1120                RetryLayer::new()
1121                    .with_min_delay(Duration::from_secs_f32(0.1))
1122                    .with_max_times(5),
1123            )
1124            .finish();
1125
1126        let paths = vec!["hello", "world", "test", "batch"];
1127        op.delete_stream(stream::iter(paths)).await.unwrap();
1128        assert_eq!(*builder.attempt.lock().unwrap(), 5);
1129    }
1130}