1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
120 builder: ExponentialBuilder,
121 notify: Arc<I>,
122}
123
124impl<I: RetryInterceptor> Clone for RetryLayer<I> {
125 fn clone(&self) -> Self {
126 Self {
127 builder: self.builder,
128 notify: self.notify.clone(),
129 }
130 }
131}
132
133impl Default for RetryLayer {
134 fn default() -> Self {
135 Self {
136 builder: ExponentialBuilder::default(),
137 notify: Arc::new(DefaultRetryInterceptor),
138 }
139 }
140}
141
142impl RetryLayer {
143 pub fn new() -> RetryLayer {
158 Self::default()
159 }
160}
161
162impl<I: RetryInterceptor> RetryLayer<I> {
163 pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
178 RetryLayer {
179 builder: self.builder,
180 notify: Arc::new(notify),
181 }
182 }
183
184 pub fn with_jitter(mut self) -> Self {
189 self.builder = self.builder.with_jitter();
190 self
191 }
192
193 pub fn with_factor(mut self, factor: f32) -> Self {
199 self.builder = self.builder.with_factor(factor);
200 self
201 }
202
203 pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
205 self.builder = self.builder.with_min_delay(min_delay);
206 self
207 }
208
209 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
213 self.builder = self.builder.with_max_delay(max_delay);
214 self
215 }
216
217 pub fn with_max_times(mut self, max_times: usize) -> Self {
221 self.builder = self.builder.with_max_times(max_times);
222 self
223 }
224}
225
226impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
227 type LayeredAccess = RetryAccessor<A, I>;
228
229 fn layer(&self, inner: A) -> Self::LayeredAccess {
230 RetryAccessor {
231 inner: Arc::new(inner),
232 builder: self.builder,
233 notify: self.notify.clone(),
234 }
235 }
236}
237
238pub trait RetryInterceptor: Send + Sync + 'static {
240 fn intercept(&self, err: &Error, dur: Duration);
256}
257
258impl<F> RetryInterceptor for F
259where
260 F: Fn(&Error, Duration) + Send + Sync + 'static,
261{
262 fn intercept(&self, err: &Error, dur: Duration) {
263 self(err, dur);
264 }
265}
266
267pub struct DefaultRetryInterceptor;
269
270impl RetryInterceptor for DefaultRetryInterceptor {
271 fn intercept(&self, err: &Error, dur: Duration) {
272 warn!(
273 target: "opendal::layers::retry",
274 "will retry after {}s because: {}",
275 dur.as_secs_f64(), err)
276 }
277}
278
279pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
280 inner: Arc<A>,
281 builder: ExponentialBuilder,
282 notify: Arc<I>,
283}
284
285impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct("RetryAccessor")
288 .field("inner", &self.inner)
289 .finish_non_exhaustive()
290 }
291}
292
293impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
294 type Inner = A;
295 type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
296 type Writer = RetryWrapper<A::Writer, I>;
297 type Lister = RetryWrapper<A::Lister, I>;
298 type Deleter = RetryWrapper<A::Deleter, I>;
299
300 fn inner(&self) -> &Self::Inner {
301 &self.inner
302 }
303
304 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
305 { || self.inner.create_dir(path, args.clone()) }
306 .retry(self.builder)
307 .when(|e| e.is_temporary())
308 .notify(|err, dur: Duration| self.notify.intercept(err, dur))
309 .await
310 .map_err(|e| e.set_persistent())
311 }
312
313 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
314 let (rp, reader) = { || self.inner.read(path, args.clone()) }
315 .retry(self.builder)
316 .when(|e| e.is_temporary())
317 .notify(|err, dur| self.notify.intercept(err, dur))
318 .await
319 .map_err(|e| e.set_persistent())?;
320
321 let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
322 let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
323
324 Ok((rp, retry_wrapper))
325 }
326
327 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
328 { || self.inner.write(path, args.clone()) }
329 .retry(self.builder)
330 .when(|e| e.is_temporary())
331 .notify(|err, dur| self.notify.intercept(err, dur))
332 .await
333 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
334 .map_err(|e| e.set_persistent())
335 }
336
337 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
338 { || self.inner.stat(path, args.clone()) }
339 .retry(self.builder)
340 .when(|e| e.is_temporary())
341 .notify(|err, dur| self.notify.intercept(err, dur))
342 .await
343 .map_err(|e| e.set_persistent())
344 }
345
346 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
347 { || self.inner.delete() }
348 .retry(self.builder)
349 .when(|e| e.is_temporary())
350 .notify(|err, dur| self.notify.intercept(err, dur))
351 .await
352 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
353 .map_err(|e| e.set_persistent())
354 }
355
356 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
357 { || self.inner.copy(from, to, args.clone()) }
358 .retry(self.builder)
359 .when(|e| e.is_temporary())
360 .notify(|err, dur| self.notify.intercept(err, dur))
361 .await
362 .map_err(|e| e.set_persistent())
363 }
364
365 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
366 { || self.inner.rename(from, to, args.clone()) }
367 .retry(self.builder)
368 .when(|e| e.is_temporary())
369 .notify(|err, dur| self.notify.intercept(err, dur))
370 .await
371 .map_err(|e| e.set_persistent())
372 }
373
374 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
375 { || self.inner.list(path, args.clone()) }
376 .retry(self.builder)
377 .when(|e| e.is_temporary())
378 .notify(|err, dur| self.notify.intercept(err, dur))
379 .await
380 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
381 .map_err(|e| e.set_persistent())
382 }
383}
384
385pub struct RetryReader<A, R> {
386 inner: Arc<A>,
387 reader: Option<R>,
388
389 path: String,
390 args: OpRead,
391}
392
393impl<A, R> RetryReader<A, R> {
394 fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
395 Self {
396 inner,
397 reader: Some(r),
398
399 path,
400 args,
401 }
402 }
403}
404
405impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
406 async fn read(&mut self) -> Result<Buffer> {
407 loop {
408 match self.reader.take() {
409 None => {
410 let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
411 self.reader = Some(r);
412 continue;
413 }
414 Some(mut reader) => {
415 let buf = reader.read().await?;
416 self.reader = Some(reader);
417 self.args.range_mut().advance(buf.len() as u64);
418 return Ok(buf);
419 }
420 }
421 }
422 }
423}
424
425pub struct RetryWrapper<R, I> {
426 inner: Option<R>,
427 notify: Arc<I>,
428
429 builder: ExponentialBuilder,
430}
431
432impl<R, I> RetryWrapper<R, I> {
433 fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
434 Self {
435 inner: Some(inner),
436 notify,
437 builder: backoff,
438 }
439 }
440
441 fn take_inner(&mut self) -> Result<R> {
442 self.inner.take().ok_or_else(|| {
443 Error::new(
444 ErrorKind::Unexpected,
445 "retry layer is in bad state, please make sure future not dropped before ready",
446 )
447 })
448 }
449}
450
451impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
452 async fn read(&mut self) -> Result<Buffer> {
453 use backon::RetryableWithContext;
454
455 let inner = self.take_inner()?;
456
457 let (inner, res) = {
458 |mut r: R| async move {
459 let res = r.read().await;
460
461 (r, res)
462 }
463 }
464 .retry(self.builder)
465 .when(|e| e.is_temporary())
466 .context(inner)
467 .notify(|err, dur| self.notify.intercept(err, dur))
468 .await;
469
470 self.inner = Some(inner);
471 res.map_err(|err| err.set_persistent())
472 }
473}
474
475impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
476 async fn write(&mut self, bs: Buffer) -> Result<()> {
477 use backon::RetryableWithContext;
478
479 let inner = self.take_inner()?;
480
481 let ((inner, _), res) = {
482 |(mut r, bs): (R, Buffer)| async move {
483 let res = r.write(bs.clone()).await;
484
485 ((r, bs), res)
486 }
487 }
488 .retry(self.builder)
489 .when(|e| e.is_temporary())
490 .context((inner, bs))
491 .notify(|err, dur| self.notify.intercept(err, dur))
492 .await;
493
494 self.inner = Some(inner);
495 res.map_err(|err| err.set_persistent())
496 }
497
498 async fn abort(&mut self) -> Result<()> {
499 use backon::RetryableWithContext;
500
501 let inner = self.take_inner()?;
502
503 let (inner, res) = {
504 |mut r: R| async move {
505 let res = r.abort().await;
506
507 (r, res)
508 }
509 }
510 .retry(self.builder)
511 .when(|e| e.is_temporary())
512 .context(inner)
513 .notify(|err, dur| self.notify.intercept(err, dur))
514 .await;
515
516 self.inner = Some(inner);
517 res.map_err(|err| err.set_persistent())
518 }
519
520 async fn close(&mut self) -> Result<Metadata> {
521 use backon::RetryableWithContext;
522
523 let inner = self.take_inner()?;
524
525 let (inner, res) = {
526 |mut r: R| async move {
527 let res = r.close().await;
528
529 (r, res)
530 }
531 }
532 .retry(self.builder)
533 .when(|e| e.is_temporary())
534 .context(inner)
535 .notify(|err, dur| self.notify.intercept(err, dur))
536 .await;
537
538 self.inner = Some(inner);
539 res.map_err(|err| err.set_persistent())
540 }
541}
542
543impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
544 async fn next(&mut self) -> Result<Option<oio::Entry>> {
545 use backon::RetryableWithContext;
546
547 let inner = self.take_inner()?;
548
549 let (inner, res) = {
550 |mut p: P| async move {
551 let res = p.next().await;
552
553 (p, res)
554 }
555 }
556 .retry(self.builder)
557 .when(|e| e.is_temporary())
558 .context(inner)
559 .notify(|err, dur| self.notify.intercept(err, dur))
560 .await;
561
562 self.inner = Some(inner);
563 res.map_err(|err| err.set_persistent())
564 }
565}
566
567impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
568 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
569 { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
570 .retry(self.builder)
571 .when(|e| e.is_temporary())
572 .notify(|err, dur| {
573 self.notify.intercept(err, dur);
574 })
575 .call()
576 .map_err(|e| e.set_persistent())
577 }
578
579 async fn flush(&mut self) -> Result<usize> {
580 use backon::RetryableWithContext;
581
582 let inner = self.take_inner()?;
583
584 let (inner, res) = {
585 |mut p: P| async move {
586 let res = p.flush().await;
587
588 (p, res)
589 }
590 }
591 .retry(self.builder)
592 .when(|e| e.is_temporary())
593 .context(inner)
594 .notify(|err, dur| self.notify.intercept(err, dur))
595 .await;
596
597 self.inner = Some(inner);
598 res.map_err(|err| err.set_persistent())
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use std::mem;
605 use std::sync::Arc;
606 use std::sync::Mutex;
607
608 use bytes::Bytes;
609 use futures::stream;
610 use futures::TryStreamExt;
611 use tracing_subscriber::filter::LevelFilter;
612
613 use super::*;
614 use crate::layers::LoggingLayer;
615
616 #[derive(Default, Clone)]
617 struct MockBuilder {
618 attempt: Arc<Mutex<usize>>,
619 }
620
621 impl Builder for MockBuilder {
622 type Config = ();
623
624 fn build(self) -> Result<impl Access> {
625 Ok(MockService {
626 attempt: self.attempt.clone(),
627 })
628 }
629 }
630
631 #[derive(Debug, Clone, Default)]
632 struct MockService {
633 attempt: Arc<Mutex<usize>>,
634 }
635
636 impl Access for MockService {
637 type Reader = MockReader;
638 type Writer = MockWriter;
639 type Lister = MockLister;
640 type Deleter = MockDeleter;
641
642 fn info(&self) -> Arc<AccessorInfo> {
643 let am = AccessorInfo::default();
644 am.set_scheme("mock").set_native_capability(Capability {
645 read: true,
646 write: true,
647 write_can_multi: true,
648 delete: true,
649 delete_max_size: Some(10),
650 stat: true,
651 list: true,
652 list_with_recursive: true,
653 ..Default::default()
654 });
655
656 am.into()
657 }
658
659 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
660 Ok(RpStat::new(
661 Metadata::new(EntryMode::FILE).with_content_length(13),
662 ))
663 }
664
665 async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
666 Ok((
667 RpRead::new(),
668 MockReader {
669 buf: Bytes::from("Hello, World!").into(),
670 range: args.range(),
671 attempt: self.attempt.clone(),
672 },
673 ))
674 }
675
676 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
677 Ok((
678 RpDelete::default(),
679 MockDeleter {
680 size: 0,
681 attempt: self.attempt.clone(),
682 },
683 ))
684 }
685
686 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
687 Ok((RpWrite::new(), MockWriter {}))
688 }
689
690 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
691 let lister = MockLister::default();
692 Ok((RpList::default(), lister))
693 }
694 }
695
696 #[derive(Debug, Clone, Default)]
697 struct MockReader {
698 buf: Buffer,
699 range: BytesRange,
700 attempt: Arc<Mutex<usize>>,
701 }
702
703 impl oio::Read for MockReader {
704 async fn read(&mut self) -> Result<Buffer> {
705 let mut attempt = self.attempt.lock().unwrap();
706 *attempt += 1;
707
708 match *attempt {
709 1 => Err(
710 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
711 .set_temporary(),
712 ),
713 2 => Err(
714 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
715 .set_temporary(),
716 ),
717 3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
719 4 => Err(
720 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
721 .set_temporary(),
722 ),
723 5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
725 _ => unreachable!(),
726 }
727 }
728 }
729
730 #[derive(Debug, Clone, Default)]
731 struct MockWriter {}
732
733 impl oio::Write for MockWriter {
734 async fn write(&mut self, _: Buffer) -> Result<()> {
735 Ok(())
736 }
737
738 async fn close(&mut self) -> Result<Metadata> {
739 Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
740 }
741
742 async fn abort(&mut self) -> Result<()> {
743 Ok(())
744 }
745 }
746
747 #[derive(Debug, Clone, Default)]
748 struct MockLister {
749 attempt: usize,
750 }
751
752 impl oio::List for MockLister {
753 async fn next(&mut self) -> Result<Option<oio::Entry>> {
754 self.attempt += 1;
755 match self.attempt {
756 1 => Err(Error::new(
757 ErrorKind::RateLimited,
758 "retryable rate limited error from lister",
759 )
760 .set_temporary()),
761 2 => Ok(Some(oio::Entry::new(
762 "hello",
763 Metadata::new(EntryMode::FILE),
764 ))),
765 3 => Ok(Some(oio::Entry::new(
766 "world",
767 Metadata::new(EntryMode::FILE),
768 ))),
769 4 => Err(
770 Error::new(ErrorKind::Unexpected, "retryable internal server error")
771 .set_temporary(),
772 ),
773 5 => Ok(Some(oio::Entry::new(
774 "2023/",
775 Metadata::new(EntryMode::DIR),
776 ))),
777 6 => Ok(Some(oio::Entry::new(
778 "0208/",
779 Metadata::new(EntryMode::DIR),
780 ))),
781 7 => Ok(None),
782 _ => {
783 unreachable!()
784 }
785 }
786 }
787 }
788
789 #[derive(Debug, Clone, Default)]
790 struct MockDeleter {
791 size: usize,
792 attempt: Arc<Mutex<usize>>,
793 }
794
795 impl oio::Delete for MockDeleter {
796 fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
797 self.size += 1;
798 Ok(())
799 }
800
801 async fn flush(&mut self) -> Result<usize> {
802 let mut attempt = self.attempt.lock().unwrap();
803 *attempt += 1;
804
805 match *attempt {
806 1 => Err(
807 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
808 .set_temporary(),
809 ),
810 2 => {
811 self.size -= 1;
812 Ok(1)
813 }
814 3 => Err(
815 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
816 .set_temporary(),
817 ),
818 4 => Err(
819 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
820 .set_temporary(),
821 ),
822 5 => {
823 let s = mem::take(&mut self.size);
824 Ok(s)
825 }
826 _ => unreachable!(),
827 }
828 }
829 }
830
831 #[tokio::test]
832 async fn test_retry_read() {
833 let _ = tracing_subscriber::fmt()
834 .with_max_level(LevelFilter::TRACE)
835 .with_test_writer()
836 .try_init();
837
838 let builder = MockBuilder::default();
839 let op = Operator::new(builder.clone())
840 .unwrap()
841 .layer(LoggingLayer::default())
842 .layer(RetryLayer::new())
843 .finish();
844
845 let r = op.reader("retryable_error").await.unwrap();
846 let mut content = Vec::new();
847 let size = r
848 .read_into(&mut content, ..)
849 .await
850 .expect("read must succeed");
851 assert_eq!(size, 13);
852 assert_eq!(content, "Hello, World!".as_bytes());
853 assert_eq!(*builder.attempt.lock().unwrap(), 5);
855 }
856
857 #[tokio::test]
859 async fn test_retry_write_fail_on_close() {
860 let _ = tracing_subscriber::fmt()
861 .with_max_level(LevelFilter::TRACE)
862 .with_test_writer()
863 .try_init();
864
865 let builder = MockBuilder::default();
866 let op = Operator::new(builder.clone())
867 .unwrap()
868 .layer(
869 RetryLayer::new()
870 .with_min_delay(Duration::from_millis(1))
871 .with_max_delay(Duration::from_millis(1))
872 .with_jitter(),
873 )
874 .layer(LoggingLayer::default())
877 .finish();
878
879 let mut w = op.writer("test_write").await.unwrap();
880 w.write("aaa").await.unwrap();
881 w.write("bbb").await.unwrap();
882 match w.close().await {
883 Ok(_) => (),
884 Err(_) => {
885 w.abort().await.unwrap();
886 }
887 };
888 }
889
890 #[tokio::test]
891 async fn test_retry_list() {
892 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
893
894 let builder = MockBuilder::default();
895 let op = Operator::new(builder.clone())
896 .unwrap()
897 .layer(RetryLayer::new())
898 .finish();
899
900 let expected = vec!["hello", "world", "2023/", "0208/"];
901
902 let mut lister = op
903 .lister("retryable_error/")
904 .await
905 .expect("service must support list");
906 let mut actual = Vec::new();
907 while let Some(obj) = lister.try_next().await.expect("must success") {
908 actual.push(obj.name().to_owned());
909 }
910
911 assert_eq!(actual, expected);
912 }
913
914 #[tokio::test]
915 async fn test_retry_batch() {
916 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
917
918 let builder = MockBuilder::default();
919 let op = Operator::new(builder.clone())
921 .unwrap()
922 .layer(
923 RetryLayer::new()
924 .with_min_delay(Duration::from_secs_f32(0.1))
925 .with_max_times(5),
926 )
927 .finish();
928
929 let paths = vec!["hello", "world", "test", "batch"];
930 op.delete_stream(stream::iter(paths)).await.unwrap();
931 assert_eq!(*builder.attempt.lock().unwrap(), 5);
932 }
933}