1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
120 builder: ExponentialBuilder,
121 notify: Arc<I>,
122}
123
124impl<I: RetryInterceptor> Clone for RetryLayer<I> {
125 fn clone(&self) -> Self {
126 Self {
127 builder: self.builder,
128 notify: self.notify.clone(),
129 }
130 }
131}
132
133impl Default for RetryLayer {
134 fn default() -> Self {
135 Self {
136 builder: ExponentialBuilder::default(),
137 notify: Arc::new(DefaultRetryInterceptor),
138 }
139 }
140}
141
142impl RetryLayer {
143 pub fn new() -> RetryLayer {
158 Self::default()
159 }
160}
161
162impl<I: RetryInterceptor> RetryLayer<I> {
163 pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
178 RetryLayer {
179 builder: self.builder,
180 notify: Arc::new(notify),
181 }
182 }
183
184 pub fn with_jitter(mut self) -> Self {
189 self.builder = self.builder.with_jitter();
190 self
191 }
192
193 pub fn with_factor(mut self, factor: f32) -> Self {
199 self.builder = self.builder.with_factor(factor);
200 self
201 }
202
203 pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
205 self.builder = self.builder.with_min_delay(min_delay);
206 self
207 }
208
209 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
213 self.builder = self.builder.with_max_delay(max_delay);
214 self
215 }
216
217 pub fn with_max_times(mut self, max_times: usize) -> Self {
221 self.builder = self.builder.with_max_times(max_times);
222 self
223 }
224}
225
226impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
227 type LayeredAccess = RetryAccessor<A, I>;
228
229 fn layer(&self, inner: A) -> Self::LayeredAccess {
230 RetryAccessor {
231 inner: Arc::new(inner),
232 builder: self.builder,
233 notify: self.notify.clone(),
234 }
235 }
236}
237
238pub trait RetryInterceptor: Send + Sync + 'static {
240 fn intercept(&self, err: &Error, dur: Duration);
256}
257
258impl<F> RetryInterceptor for F
259where
260 F: Fn(&Error, Duration) + Send + Sync + 'static,
261{
262 fn intercept(&self, err: &Error, dur: Duration) {
263 self(err, dur);
264 }
265}
266
267pub struct DefaultRetryInterceptor;
269
270impl RetryInterceptor for DefaultRetryInterceptor {
271 fn intercept(&self, err: &Error, dur: Duration) {
272 warn!(
273 target: "opendal::layers::retry",
274 "will retry after {}s because: {}",
275 dur.as_secs_f64(), err)
276 }
277}
278
279pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
280 inner: Arc<A>,
281 builder: ExponentialBuilder,
282 notify: Arc<I>,
283}
284
285impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct("RetryAccessor")
288 .field("inner", &self.inner)
289 .finish_non_exhaustive()
290 }
291}
292
293impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
294 type Inner = A;
295 type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
296 type Writer = RetryWrapper<A::Writer, I>;
297 type Lister = RetryWrapper<A::Lister, I>;
298 type Deleter = RetryWrapper<A::Deleter, I>;
299
300 fn inner(&self) -> &Self::Inner {
301 &self.inner
302 }
303
304 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
305 { || self.inner.create_dir(path, args.clone()) }
306 .retry(self.builder)
307 .when(|e| e.is_temporary())
308 .notify(|err, dur: Duration| self.notify.intercept(err, dur))
309 .await
310 .map_err(|e| e.set_persistent())
311 }
312
313 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
314 let (rp, reader) = { || self.inner.read(path, args.clone()) }
315 .retry(self.builder)
316 .when(|e| e.is_temporary())
317 .notify(|err, dur| self.notify.intercept(err, dur))
318 .await
319 .map_err(|e| e.set_persistent())?;
320
321 let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
322 let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
323
324 Ok((rp, retry_wrapper))
325 }
326
327 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
328 { || self.inner.write(path, args.clone()) }
329 .retry(self.builder)
330 .when(|e| e.is_temporary())
331 .notify(|err, dur| self.notify.intercept(err, dur))
332 .await
333 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
334 .map_err(|e| e.set_persistent())
335 }
336
337 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
338 { || self.inner.stat(path, args.clone()) }
339 .retry(self.builder)
340 .when(|e| e.is_temporary())
341 .notify(|err, dur| self.notify.intercept(err, dur))
342 .await
343 .map_err(|e| e.set_persistent())
344 }
345
346 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
347 { || self.inner.delete() }
348 .retry(self.builder)
349 .when(|e| e.is_temporary())
350 .notify(|err, dur| self.notify.intercept(err, dur))
351 .await
352 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
353 .map_err(|e| e.set_persistent())
354 }
355
356 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
357 { || self.inner.copy(from, to, args.clone()) }
358 .retry(self.builder)
359 .when(|e| e.is_temporary())
360 .notify(|err, dur| self.notify.intercept(err, dur))
361 .await
362 .map_err(|e| e.set_persistent())
363 }
364
365 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
366 { || self.inner.rename(from, to, args.clone()) }
367 .retry(self.builder)
368 .when(|e| e.is_temporary())
369 .notify(|err, dur| self.notify.intercept(err, dur))
370 .await
371 .map_err(|e| e.set_persistent())
372 }
373
374 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
375 { || self.inner.list(path, args.clone()) }
376 .retry(self.builder)
377 .when(|e| e.is_temporary())
378 .notify(|err, dur| self.notify.intercept(err, dur))
379 .await
380 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
381 .map_err(|e| e.set_persistent())
382 }
383}
384
385pub struct RetryReader<A, R> {
386 inner: Arc<A>,
387 reader: Option<R>,
388
389 path: String,
390 args: OpRead,
391}
392
393impl<A, R> RetryReader<A, R> {
394 fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
395 Self {
396 inner,
397 reader: Some(r),
398
399 path,
400 args,
401 }
402 }
403}
404
405impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
406 async fn read(&mut self) -> Result<Buffer> {
407 loop {
408 match self.reader.take() {
409 None => {
410 let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
411 self.reader = Some(r);
412 continue;
413 }
414 Some(mut reader) => {
415 let buf = reader.read().await?;
416 self.reader = Some(reader);
417 self.args.range_mut().advance(buf.len() as u64);
418 return Ok(buf);
419 }
420 }
421 }
422 }
423}
424
425pub struct RetryWrapper<R, I> {
426 inner: Option<R>,
427 notify: Arc<I>,
428
429 builder: ExponentialBuilder,
430}
431
432impl<R, I> RetryWrapper<R, I> {
433 fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
434 Self {
435 inner: Some(inner),
436 notify,
437 builder: backoff,
438 }
439 }
440
441 fn take_inner(&mut self) -> Result<R> {
442 self.inner.take().ok_or_else(|| {
443 Error::new(
444 ErrorKind::Unexpected,
445 "retry layer is in bad state, please make sure future not dropped before ready",
446 )
447 })
448 }
449}
450
451impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
452 async fn read(&mut self) -> Result<Buffer> {
453 use backon::RetryableWithContext;
454
455 let inner = self.take_inner()?;
456
457 let (inner, res) = {
458 |mut r: R| async move {
459 let res = r.read().await;
460
461 (r, res)
462 }
463 }
464 .retry(self.builder)
465 .when(|e| e.is_temporary())
466 .context(inner)
467 .notify(|err, dur| self.notify.intercept(err, dur))
468 .await;
469
470 self.inner = Some(inner);
471 res.map_err(|err| err.set_persistent())
472 }
473}
474
475impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
476 async fn write(&mut self, bs: Buffer) -> Result<()> {
477 use backon::RetryableWithContext;
478
479 let inner = self.take_inner()?;
480
481 let ((inner, _), res) = {
482 |(mut r, bs): (R, Buffer)| async move {
483 let res = r.write(bs.clone()).await;
484
485 ((r, bs), res)
486 }
487 }
488 .retry(self.builder)
489 .when(|e| e.is_temporary())
490 .context((inner, bs))
491 .notify(|err, dur| self.notify.intercept(err, dur))
492 .await;
493
494 self.inner = Some(inner);
495 res.map_err(|err| err.set_persistent())
496 }
497
498 async fn abort(&mut self) -> Result<()> {
499 use backon::RetryableWithContext;
500
501 let inner = self.take_inner()?;
502
503 let (inner, res) = {
504 |mut r: R| async move {
505 let res = r.abort().await;
506
507 (r, res)
508 }
509 }
510 .retry(self.builder)
511 .when(|e| e.is_temporary())
512 .context(inner)
513 .notify(|err, dur| self.notify.intercept(err, dur))
514 .await;
515
516 self.inner = Some(inner);
517 res.map_err(|err| err.set_persistent())
518 }
519
520 async fn close(&mut self) -> Result<Metadata> {
521 use backon::RetryableWithContext;
522
523 let inner = self.take_inner()?;
524
525 let (inner, res) = {
526 |mut r: R| async move {
527 let res = r.close().await;
528
529 (r, res)
530 }
531 }
532 .retry(self.builder)
533 .when(|e| e.is_temporary())
534 .context(inner)
535 .notify(|err, dur| self.notify.intercept(err, dur))
536 .await;
537
538 self.inner = Some(inner);
539 res.map_err(|err| err.set_persistent())
540 }
541}
542
543impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
544 async fn next(&mut self) -> Result<Option<oio::Entry>> {
545 use backon::RetryableWithContext;
546
547 let inner = self.take_inner()?;
548
549 let (inner, res) = {
550 |mut p: P| async move {
551 let res = p.next().await;
552
553 (p, res)
554 }
555 }
556 .retry(self.builder)
557 .when(|e| e.is_temporary())
558 .context(inner)
559 .notify(|err, dur| self.notify.intercept(err, dur))
560 .await;
561
562 self.inner = Some(inner);
563 res.map_err(|err| err.set_persistent())
564 }
565}
566
567impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
568 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
569 { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
570 .retry(self.builder)
571 .when(|e| e.is_temporary())
572 .notify(|err, dur| {
573 self.notify.intercept(err, dur);
574 })
575 .call()
576 .map_err(|e| e.set_persistent())
577 }
578
579 async fn flush(&mut self) -> Result<usize> {
580 use backon::RetryableWithContext;
581
582 let inner = self.take_inner()?;
583
584 let (inner, res) = {
585 |mut p: P| async move {
586 let res = p.flush().await;
587
588 (p, res)
589 }
590 }
591 .retry(self.builder)
592 .when(|e| e.is_temporary())
593 .context(inner)
594 .notify(|err, dur| self.notify.intercept(err, dur))
595 .await;
596
597 self.inner = Some(inner);
598 res.map_err(|err| err.set_persistent())
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use std::mem;
605 use std::sync::Arc;
606 use std::sync::Mutex;
607
608 use bytes::Bytes;
609 use futures::{stream, TryStreamExt};
610 use tracing_subscriber::filter::LevelFilter;
611
612 use super::*;
613 use crate::layers::LoggingLayer;
614
615 #[derive(Default, Clone)]
616 struct MockBuilder {
617 attempt: Arc<Mutex<usize>>,
618 }
619
620 impl Builder for MockBuilder {
621 const SCHEME: Scheme = Scheme::Custom("mock");
622 type Config = ();
623
624 fn build(self) -> Result<impl Access> {
625 Ok(MockService {
626 attempt: self.attempt.clone(),
627 })
628 }
629 }
630
631 #[derive(Debug, Clone, Default)]
632 struct MockService {
633 attempt: Arc<Mutex<usize>>,
634 }
635
636 impl Access for MockService {
637 type Reader = MockReader;
638 type Writer = MockWriter;
639 type Lister = MockLister;
640 type Deleter = MockDeleter;
641
642 fn info(&self) -> Arc<AccessorInfo> {
643 let am = AccessorInfo::default();
644 am.set_scheme(Scheme::Custom("mock"))
645 .set_native_capability(Capability {
646 read: true,
647 write: true,
648 write_can_multi: true,
649 delete: true,
650 delete_max_size: Some(10),
651 stat: true,
652 list: true,
653 list_with_recursive: true,
654 ..Default::default()
655 });
656
657 am.into()
658 }
659
660 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
661 Ok(RpStat::new(
662 Metadata::new(EntryMode::FILE).with_content_length(13),
663 ))
664 }
665
666 async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
667 Ok((
668 RpRead::new(),
669 MockReader {
670 buf: Bytes::from("Hello, World!").into(),
671 range: args.range(),
672 attempt: self.attempt.clone(),
673 },
674 ))
675 }
676
677 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
678 Ok((
679 RpDelete::default(),
680 MockDeleter {
681 size: 0,
682 attempt: self.attempt.clone(),
683 },
684 ))
685 }
686
687 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
688 Ok((RpWrite::new(), MockWriter {}))
689 }
690
691 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
692 let lister = MockLister::default();
693 Ok((RpList::default(), lister))
694 }
695 }
696
697 #[derive(Debug, Clone, Default)]
698 struct MockReader {
699 buf: Buffer,
700 range: BytesRange,
701 attempt: Arc<Mutex<usize>>,
702 }
703
704 impl oio::Read for MockReader {
705 async fn read(&mut self) -> Result<Buffer> {
706 let mut attempt = self.attempt.lock().unwrap();
707 *attempt += 1;
708
709 match *attempt {
710 1 => Err(
711 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
712 .set_temporary(),
713 ),
714 2 => Err(
715 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
716 .set_temporary(),
717 ),
718 3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
720 4 => Err(
721 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
722 .set_temporary(),
723 ),
724 5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
726 _ => unreachable!(),
727 }
728 }
729 }
730
731 #[derive(Debug, Clone, Default)]
732 struct MockWriter {}
733
734 impl oio::Write for MockWriter {
735 async fn write(&mut self, _: Buffer) -> Result<()> {
736 Ok(())
737 }
738
739 async fn close(&mut self) -> Result<Metadata> {
740 Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
741 }
742
743 async fn abort(&mut self) -> Result<()> {
744 Ok(())
745 }
746 }
747
748 #[derive(Debug, Clone, Default)]
749 struct MockLister {
750 attempt: usize,
751 }
752
753 impl oio::List for MockLister {
754 async fn next(&mut self) -> Result<Option<oio::Entry>> {
755 self.attempt += 1;
756 match self.attempt {
757 1 => Err(Error::new(
758 ErrorKind::RateLimited,
759 "retryable rate limited error from lister",
760 )
761 .set_temporary()),
762 2 => Ok(Some(oio::Entry::new(
763 "hello",
764 Metadata::new(EntryMode::FILE),
765 ))),
766 3 => Ok(Some(oio::Entry::new(
767 "world",
768 Metadata::new(EntryMode::FILE),
769 ))),
770 4 => Err(
771 Error::new(ErrorKind::Unexpected, "retryable internal server error")
772 .set_temporary(),
773 ),
774 5 => Ok(Some(oio::Entry::new(
775 "2023/",
776 Metadata::new(EntryMode::DIR),
777 ))),
778 6 => Ok(Some(oio::Entry::new(
779 "0208/",
780 Metadata::new(EntryMode::DIR),
781 ))),
782 7 => Ok(None),
783 _ => {
784 unreachable!()
785 }
786 }
787 }
788 }
789
790 #[derive(Debug, Clone, Default)]
791 struct MockDeleter {
792 size: usize,
793 attempt: Arc<Mutex<usize>>,
794 }
795
796 impl oio::Delete for MockDeleter {
797 fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
798 self.size += 1;
799 Ok(())
800 }
801
802 async fn flush(&mut self) -> Result<usize> {
803 let mut attempt = self.attempt.lock().unwrap();
804 *attempt += 1;
805
806 match *attempt {
807 1 => Err(
808 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
809 .set_temporary(),
810 ),
811 2 => {
812 self.size -= 1;
813 Ok(1)
814 }
815 3 => Err(
816 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
817 .set_temporary(),
818 ),
819 4 => Err(
820 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
821 .set_temporary(),
822 ),
823 5 => {
824 let s = mem::take(&mut self.size);
825 Ok(s)
826 }
827 _ => unreachable!(),
828 }
829 }
830 }
831
832 #[tokio::test]
833 async fn test_retry_read() {
834 let _ = tracing_subscriber::fmt()
835 .with_max_level(LevelFilter::TRACE)
836 .with_test_writer()
837 .try_init();
838
839 let builder = MockBuilder::default();
840 let op = Operator::new(builder.clone())
841 .unwrap()
842 .layer(LoggingLayer::default())
843 .layer(RetryLayer::new())
844 .finish();
845
846 let r = op.reader("retryable_error").await.unwrap();
847 let mut content = Vec::new();
848 let size = r
849 .read_into(&mut content, ..)
850 .await
851 .expect("read must succeed");
852 assert_eq!(size, 13);
853 assert_eq!(content, "Hello, World!".as_bytes());
854 assert_eq!(*builder.attempt.lock().unwrap(), 5);
856 }
857
858 #[tokio::test]
860 async fn test_retry_write_fail_on_close() {
861 let _ = tracing_subscriber::fmt()
862 .with_max_level(LevelFilter::TRACE)
863 .with_test_writer()
864 .try_init();
865
866 let builder = MockBuilder::default();
867 let op = Operator::new(builder.clone())
868 .unwrap()
869 .layer(
870 RetryLayer::new()
871 .with_min_delay(Duration::from_millis(1))
872 .with_max_delay(Duration::from_millis(1))
873 .with_jitter(),
874 )
875 .layer(LoggingLayer::default())
878 .finish();
879
880 let mut w = op.writer("test_write").await.unwrap();
881 w.write("aaa").await.unwrap();
882 w.write("bbb").await.unwrap();
883 match w.close().await {
884 Ok(_) => (),
885 Err(_) => {
886 w.abort().await.unwrap();
887 }
888 };
889 }
890
891 #[tokio::test]
892 async fn test_retry_list() {
893 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
894
895 let builder = MockBuilder::default();
896 let op = Operator::new(builder.clone())
897 .unwrap()
898 .layer(RetryLayer::new())
899 .finish();
900
901 let expected = vec!["hello", "world", "2023/", "0208/"];
902
903 let mut lister = op
904 .lister("retryable_error/")
905 .await
906 .expect("service must support list");
907 let mut actual = Vec::new();
908 while let Some(obj) = lister.try_next().await.expect("must success") {
909 actual.push(obj.name().to_owned());
910 }
911
912 assert_eq!(actual, expected);
913 }
914
915 #[tokio::test]
916 async fn test_retry_batch() {
917 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
918
919 let builder = MockBuilder::default();
920 let op = Operator::new(builder.clone())
922 .unwrap()
923 .layer(
924 RetryLayer::new()
925 .with_min_delay(Duration::from_secs_f32(0.1))
926 .with_max_times(5),
927 )
928 .finish();
929
930 let paths = vec!["hello", "world", "test", "batch"];
931 op.delete_stream(stream::iter(paths)).await.unwrap();
932 assert_eq!(*builder.attempt.lock().unwrap(), 5);
933 }
934}