1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use backon::ExponentialBuilder;
23use backon::Retryable;
24use log::warn;
25
26use crate::raw::*;
27use crate::*;
28
29pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
116 builder: ExponentialBuilder,
117 notify: Arc<I>,
118}
119
120impl<I: RetryInterceptor> Clone for RetryLayer<I> {
121 fn clone(&self) -> Self {
122 Self {
123 builder: self.builder,
124 notify: self.notify.clone(),
125 }
126 }
127}
128
129impl Default for RetryLayer {
130 fn default() -> Self {
131 Self {
132 builder: ExponentialBuilder::default(),
133 notify: Arc::new(DefaultRetryInterceptor),
134 }
135 }
136}
137
138impl RetryLayer {
139 pub fn new() -> RetryLayer {
153 Self::default()
154 }
155}
156
157impl<I: RetryInterceptor> RetryLayer<I> {
158 pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
173 RetryLayer {
174 builder: self.builder,
175 notify: Arc::new(notify),
176 }
177 }
178
179 pub fn with_jitter(mut self) -> Self {
184 self.builder = self.builder.with_jitter();
185 self
186 }
187
188 pub fn with_factor(mut self, factor: f32) -> Self {
194 self.builder = self.builder.with_factor(factor);
195 self
196 }
197
198 pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
200 self.builder = self.builder.with_min_delay(min_delay);
201 self
202 }
203
204 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
208 self.builder = self.builder.with_max_delay(max_delay);
209 self
210 }
211
212 pub fn with_max_times(mut self, max_times: usize) -> Self {
216 self.builder = self.builder.with_max_times(max_times);
217 self
218 }
219}
220
221impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
222 type LayeredAccess = RetryAccessor<A, I>;
223
224 fn layer(&self, inner: A) -> Self::LayeredAccess {
225 RetryAccessor {
226 inner: Arc::new(inner),
227 builder: self.builder,
228 notify: self.notify.clone(),
229 }
230 }
231}
232
233pub trait RetryInterceptor: Send + Sync + 'static {
235 fn intercept(&self, err: &Error, dur: Duration);
251}
252
253impl<F> RetryInterceptor for F
254where
255 F: Fn(&Error, Duration) + Send + Sync + 'static,
256{
257 fn intercept(&self, err: &Error, dur: Duration) {
258 self(err, dur);
259 }
260}
261
262pub struct DefaultRetryInterceptor;
264
265impl RetryInterceptor for DefaultRetryInterceptor {
266 fn intercept(&self, err: &Error, dur: Duration) {
267 warn!(
268 target: "opendal::layers::retry",
269 "will retry after {}s because: {}",
270 dur.as_secs_f64(), err)
271 }
272}
273
274pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
275 inner: Arc<A>,
276 builder: ExponentialBuilder,
277 notify: Arc<I>,
278}
279
280impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
281 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
282 f.debug_struct("RetryAccessor")
283 .field("inner", &self.inner)
284 .finish_non_exhaustive()
285 }
286}
287
288impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
289 type Inner = A;
290 type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
291 type Writer = RetryWrapper<A::Writer, I>;
292 type Lister = RetryWrapper<A::Lister, I>;
293 type Deleter = RetryWrapper<A::Deleter, I>;
294
295 fn inner(&self) -> &Self::Inner {
296 &self.inner
297 }
298
299 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
300 { || self.inner.create_dir(path, args.clone()) }
301 .retry(self.builder)
302 .when(|e| e.is_temporary())
303 .notify(|err, dur: Duration| self.notify.intercept(err, dur))
304 .await
305 .map_err(|e| e.set_persistent())
306 }
307
308 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
309 let (rp, reader) = { || self.inner.read(path, args.clone()) }
310 .retry(self.builder)
311 .when(|e| e.is_temporary())
312 .notify(|err, dur| self.notify.intercept(err, dur))
313 .await
314 .map_err(|e| e.set_persistent())?;
315
316 let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
317 let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
318
319 Ok((rp, retry_wrapper))
320 }
321
322 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
323 { || self.inner.write(path, args.clone()) }
324 .retry(self.builder)
325 .when(|e| e.is_temporary())
326 .notify(|err, dur| self.notify.intercept(err, dur))
327 .await
328 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
329 .map_err(|e| e.set_persistent())
330 }
331
332 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
333 { || self.inner.stat(path, args.clone()) }
334 .retry(self.builder)
335 .when(|e| e.is_temporary())
336 .notify(|err, dur| self.notify.intercept(err, dur))
337 .await
338 .map_err(|e| e.set_persistent())
339 }
340
341 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
342 { || self.inner.delete() }
343 .retry(self.builder)
344 .when(|e| e.is_temporary())
345 .notify(|err, dur| self.notify.intercept(err, dur))
346 .await
347 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
348 .map_err(|e| e.set_persistent())
349 }
350
351 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
352 { || self.inner.copy(from, to, args.clone()) }
353 .retry(self.builder)
354 .when(|e| e.is_temporary())
355 .notify(|err, dur| self.notify.intercept(err, dur))
356 .await
357 .map_err(|e| e.set_persistent())
358 }
359
360 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
361 { || self.inner.rename(from, to, args.clone()) }
362 .retry(self.builder)
363 .when(|e| e.is_temporary())
364 .notify(|err, dur| self.notify.intercept(err, dur))
365 .await
366 .map_err(|e| e.set_persistent())
367 }
368
369 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
370 { || self.inner.list(path, args.clone()) }
371 .retry(self.builder)
372 .when(|e| e.is_temporary())
373 .notify(|err, dur| self.notify.intercept(err, dur))
374 .await
375 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
376 .map_err(|e| e.set_persistent())
377 }
378}
379
380pub struct RetryReader<A, R> {
381 inner: Arc<A>,
382 reader: Option<R>,
383
384 path: String,
385 args: OpRead,
386}
387
388impl<A, R> RetryReader<A, R> {
389 fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
390 Self {
391 inner,
392 reader: Some(r),
393
394 path,
395 args,
396 }
397 }
398}
399
400impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
401 async fn read(&mut self) -> Result<Buffer> {
402 loop {
403 match self.reader.take() {
404 None => {
405 let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
406 self.reader = Some(r);
407 continue;
408 }
409 Some(mut reader) => {
410 let buf = reader.read().await?;
411 self.reader = Some(reader);
412 self.args.range_mut().advance(buf.len() as u64);
413 return Ok(buf);
414 }
415 }
416 }
417 }
418}
419
420pub struct RetryWrapper<R, I> {
421 inner: Option<R>,
422 notify: Arc<I>,
423
424 builder: ExponentialBuilder,
425}
426
427impl<R, I> RetryWrapper<R, I> {
428 fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
429 Self {
430 inner: Some(inner),
431 notify,
432 builder: backoff,
433 }
434 }
435
436 fn take_inner(&mut self) -> Result<R> {
437 self.inner.take().ok_or_else(|| {
438 Error::new(
439 ErrorKind::Unexpected,
440 "retry layer is in bad state, please make sure future not dropped before ready",
441 )
442 })
443 }
444}
445
446impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
447 async fn read(&mut self) -> Result<Buffer> {
448 use backon::RetryableWithContext;
449
450 let inner = self.take_inner()?;
451
452 let (inner, res) = {
453 |mut r: R| async move {
454 let res = r.read().await;
455
456 (r, res)
457 }
458 }
459 .retry(self.builder)
460 .when(|e| e.is_temporary())
461 .context(inner)
462 .notify(|err, dur| self.notify.intercept(err, dur))
463 .await;
464
465 self.inner = Some(inner);
466 res.map_err(|err| err.set_persistent())
467 }
468}
469
470impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
471 async fn write(&mut self, bs: Buffer) -> Result<()> {
472 use backon::RetryableWithContext;
473
474 let inner = self.take_inner()?;
475
476 let ((inner, _), res) = {
477 |(mut r, bs): (R, Buffer)| async move {
478 let res = r.write(bs.clone()).await;
479
480 ((r, bs), res)
481 }
482 }
483 .retry(self.builder)
484 .when(|e| e.is_temporary())
485 .context((inner, bs))
486 .notify(|err, dur| self.notify.intercept(err, dur))
487 .await;
488
489 self.inner = Some(inner);
490 res.map_err(|err| err.set_persistent())
491 }
492
493 async fn abort(&mut self) -> Result<()> {
494 use backon::RetryableWithContext;
495
496 let inner = self.take_inner()?;
497
498 let (inner, res) = {
499 |mut r: R| async move {
500 let res = r.abort().await;
501
502 (r, res)
503 }
504 }
505 .retry(self.builder)
506 .when(|e| e.is_temporary())
507 .context(inner)
508 .notify(|err, dur| self.notify.intercept(err, dur))
509 .await;
510
511 self.inner = Some(inner);
512 res.map_err(|err| err.set_persistent())
513 }
514
515 async fn close(&mut self) -> Result<Metadata> {
516 use backon::RetryableWithContext;
517
518 let inner = self.take_inner()?;
519
520 let (inner, res) = {
521 |mut r: R| async move {
522 let res = r.close().await;
523
524 (r, res)
525 }
526 }
527 .retry(self.builder)
528 .when(|e| e.is_temporary())
529 .context(inner)
530 .notify(|err, dur| self.notify.intercept(err, dur))
531 .await;
532
533 self.inner = Some(inner);
534 res.map_err(|err| err.set_persistent())
535 }
536}
537
538impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
539 async fn next(&mut self) -> Result<Option<oio::Entry>> {
540 use backon::RetryableWithContext;
541
542 let inner = self.take_inner()?;
543
544 let (inner, res) = {
545 |mut p: P| async move {
546 let res = p.next().await;
547
548 (p, res)
549 }
550 }
551 .retry(self.builder)
552 .when(|e| e.is_temporary())
553 .context(inner)
554 .notify(|err, dur| self.notify.intercept(err, dur))
555 .await;
556
557 self.inner = Some(inner);
558 res.map_err(|err| err.set_persistent())
559 }
560}
561
562impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
563 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
564 use backon::RetryableWithContext;
565
566 let inner = self.take_inner()?;
567 let path = path.to_string();
568 let args_cloned = args.clone();
569
570 let (inner, res) = {
571 |mut p: P| {
572 let path = path.clone();
573 let args = args_cloned.clone();
574 async move {
575 let res = p.delete(&path, args).await;
576 (p, res)
577 }
578 }
579 }
580 .retry(self.builder)
581 .when(|e| e.is_temporary())
582 .context(inner)
583 .notify(|err, dur| {
584 self.notify.intercept(err, dur);
585 })
586 .await;
587
588 self.inner = Some(inner);
589 res.map_err(|e| e.set_persistent())
590 }
591
592 async fn close(&mut self) -> Result<()> {
593 use backon::RetryableWithContext;
594
595 let inner = self.take_inner()?;
596
597 let (inner, res) = {
598 |mut p: P| async move {
599 let res = p.close().await;
600
601 (p, res)
602 }
603 }
604 .retry(self.builder)
605 .when(|e| e.is_temporary())
606 .context(inner)
607 .notify(|err, dur| self.notify.intercept(err, dur))
608 .await;
609
610 self.inner = Some(inner);
611 res.map_err(|err| err.set_persistent())
612 }
613}
614
615#[cfg(test)]
616mod tests {
617 use std::sync::Arc;
618 use std::sync::Mutex;
619
620 use bytes::Bytes;
621 use futures::TryStreamExt;
622 use futures::stream;
623 use tracing_subscriber::filter::LevelFilter;
624
625 use super::*;
626 use crate::layers::LoggingLayer;
627
628 #[derive(Default, Clone)]
629 struct MockBuilder {
630 attempt: Arc<Mutex<usize>>,
631 }
632
633 impl Builder for MockBuilder {
634 type Config = ();
635
636 fn build(self) -> Result<impl Access> {
637 Ok(MockService {
638 attempt: self.attempt.clone(),
639 })
640 }
641 }
642
643 #[derive(Debug, Clone, Default)]
644 struct MockService {
645 attempt: Arc<Mutex<usize>>,
646 }
647
648 impl Access for MockService {
649 type Reader = MockReader;
650 type Writer = MockWriter;
651 type Lister = MockLister;
652 type Deleter = MockDeleter;
653
654 fn info(&self) -> Arc<AccessorInfo> {
655 let am = AccessorInfo::default();
656 am.set_scheme("mock").set_native_capability(Capability {
657 read: true,
658 write: true,
659 write_can_multi: true,
660 delete: true,
661 delete_max_size: Some(10),
662 stat: true,
663 list: true,
664 list_with_recursive: true,
665 ..Default::default()
666 });
667
668 am.into()
669 }
670
671 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
672 Ok(RpStat::new(
673 Metadata::new(EntryMode::FILE).with_content_length(13),
674 ))
675 }
676
677 async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
678 Ok((
679 RpRead::new(),
680 MockReader {
681 buf: Bytes::from("Hello, World!").into(),
682 range: args.range(),
683 attempt: self.attempt.clone(),
684 },
685 ))
686 }
687
688 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
689 Ok((
690 RpDelete::default(),
691 MockDeleter {
692 size: 0,
693 attempt: self.attempt.clone(),
694 },
695 ))
696 }
697
698 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
699 Ok((RpWrite::new(), MockWriter {}))
700 }
701
702 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
703 let lister = MockLister::default();
704 Ok((RpList::default(), lister))
705 }
706 }
707
708 #[derive(Debug, Clone, Default)]
709 struct MockReader {
710 buf: Buffer,
711 range: BytesRange,
712 attempt: Arc<Mutex<usize>>,
713 }
714
715 impl oio::Read for MockReader {
716 async fn read(&mut self) -> Result<Buffer> {
717 let mut attempt = self.attempt.lock().unwrap();
718 *attempt += 1;
719
720 match *attempt {
721 1 => Err(
722 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
723 .set_temporary(),
724 ),
725 2 => Err(
726 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
727 .set_temporary(),
728 ),
729 3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
731 4 => Err(
732 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
733 .set_temporary(),
734 ),
735 5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
737 _ => unreachable!(),
738 }
739 }
740 }
741
742 #[derive(Debug, Clone, Default)]
743 struct MockWriter {}
744
745 impl oio::Write for MockWriter {
746 async fn write(&mut self, _: Buffer) -> Result<()> {
747 Ok(())
748 }
749
750 async fn close(&mut self) -> Result<Metadata> {
751 Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
752 }
753
754 async fn abort(&mut self) -> Result<()> {
755 Ok(())
756 }
757 }
758
759 #[derive(Debug, Clone, Default)]
760 struct MockLister {
761 attempt: usize,
762 }
763
764 impl oio::List for MockLister {
765 async fn next(&mut self) -> Result<Option<oio::Entry>> {
766 self.attempt += 1;
767 match self.attempt {
768 1 => Err(Error::new(
769 ErrorKind::RateLimited,
770 "retryable rate limited error from lister",
771 )
772 .set_temporary()),
773 2 => Ok(Some(oio::Entry::new(
774 "hello",
775 Metadata::new(EntryMode::FILE),
776 ))),
777 3 => Ok(Some(oio::Entry::new(
778 "world",
779 Metadata::new(EntryMode::FILE),
780 ))),
781 4 => Err(
782 Error::new(ErrorKind::Unexpected, "retryable internal server error")
783 .set_temporary(),
784 ),
785 5 => Ok(Some(oio::Entry::new(
786 "2023/",
787 Metadata::new(EntryMode::DIR),
788 ))),
789 6 => Ok(Some(oio::Entry::new(
790 "0208/",
791 Metadata::new(EntryMode::DIR),
792 ))),
793 7 => Ok(None),
794 _ => {
795 unreachable!()
796 }
797 }
798 }
799 }
800
801 #[derive(Debug, Clone, Default)]
802 struct MockDeleter {
803 size: usize,
804 attempt: Arc<Mutex<usize>>,
805 }
806
807 impl oio::Delete for MockDeleter {
808 async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
809 self.size += 1;
810 Ok(())
811 }
812
813 async fn close(&mut self) -> Result<()> {
814 let mut attempt = self.attempt.lock().unwrap();
815 *attempt += 1;
816
817 match *attempt {
818 1 => Err(
819 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
820 .set_temporary(),
821 ),
822 2..=4 => {
823 self.size = self.size.saturating_sub(1);
824 Err(
825 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
826 .set_temporary(),
827 )
828 }
829 5 => {
830 self.size = self.size.saturating_sub(1);
831 if self.size == 0 {
832 Ok(())
833 } else {
834 Err(
835 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
836 .set_temporary(),
837 )
838 }
839 }
840 _ => unreachable!(),
841 }
842 }
843 }
844
845 #[tokio::test]
846 async fn test_retry_read() {
847 let _ = tracing_subscriber::fmt()
848 .with_max_level(LevelFilter::TRACE)
849 .with_test_writer()
850 .try_init();
851
852 let builder = MockBuilder::default();
853 let op = Operator::new(builder.clone())
854 .unwrap()
855 .layer(LoggingLayer::default())
856 .layer(RetryLayer::new())
857 .finish();
858
859 let r = op.reader("retryable_error").await.unwrap();
860 let mut content = Vec::new();
861 let size = r
862 .read_into(&mut content, ..)
863 .await
864 .expect("read must succeed");
865 assert_eq!(size, 13);
866 assert_eq!(content, "Hello, World!".as_bytes());
867 assert_eq!(*builder.attempt.lock().unwrap(), 5);
869 }
870
871 #[tokio::test]
873 async fn test_retry_write_fail_on_close() {
874 let _ = tracing_subscriber::fmt()
875 .with_max_level(LevelFilter::TRACE)
876 .with_test_writer()
877 .try_init();
878
879 let builder = MockBuilder::default();
880 let op = Operator::new(builder.clone())
881 .unwrap()
882 .layer(
883 RetryLayer::new()
884 .with_min_delay(Duration::from_millis(1))
885 .with_max_delay(Duration::from_millis(1))
886 .with_jitter(),
887 )
888 .layer(LoggingLayer::default())
891 .finish();
892
893 let mut w = op.writer("test_write").await.unwrap();
894 w.write("aaa").await.unwrap();
895 w.write("bbb").await.unwrap();
896 match w.close().await {
897 Ok(_) => (),
898 Err(_) => {
899 w.abort().await.unwrap();
900 }
901 };
902 }
903
904 #[tokio::test]
905 async fn test_retry_list() {
906 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
907
908 let builder = MockBuilder::default();
909 let op = Operator::new(builder.clone())
910 .unwrap()
911 .layer(RetryLayer::new())
912 .finish();
913
914 let expected = vec!["hello", "world", "2023/", "0208/"];
915
916 let mut lister = op
917 .lister("retryable_error/")
918 .await
919 .expect("service must support list");
920 let mut actual = Vec::new();
921 while let Some(obj) = lister.try_next().await.expect("must success") {
922 actual.push(obj.name().to_owned());
923 }
924
925 assert_eq!(actual, expected);
926 }
927
928 #[tokio::test]
929 async fn test_retry_batch() {
930 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
931
932 let builder = MockBuilder::default();
933 let op = Operator::new(builder.clone())
935 .unwrap()
936 .layer(
937 RetryLayer::new()
938 .with_min_delay(Duration::from_secs_f32(0.1))
939 .with_max_times(5),
940 )
941 .finish();
942
943 let paths = vec!["hello", "world", "test", "batch"];
944 op.delete_stream(stream::iter(paths)).await.unwrap();
945 assert_eq!(*builder.attempt.lock().unwrap(), 5);
946 }
947}