1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use backon::BlockingRetryable;
24use backon::ExponentialBuilder;
25use backon::Retryable;
26use log::warn;
27
28use crate::raw::*;
29use crate::*;
30
31pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> {
120 builder: ExponentialBuilder,
121 notify: Arc<I>,
122}
123
124impl<I: RetryInterceptor> Clone for RetryLayer<I> {
125 fn clone(&self) -> Self {
126 Self {
127 builder: self.builder,
128 notify: self.notify.clone(),
129 }
130 }
131}
132
133impl Default for RetryLayer {
134 fn default() -> Self {
135 Self {
136 builder: ExponentialBuilder::default(),
137 notify: Arc::new(DefaultRetryInterceptor),
138 }
139 }
140}
141
142impl RetryLayer {
143 pub fn new() -> RetryLayer {
158 Self::default()
159 }
160}
161
162impl<I: RetryInterceptor> RetryLayer<I> {
163 pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> {
178 RetryLayer {
179 builder: self.builder,
180 notify: Arc::new(notify),
181 }
182 }
183
184 pub fn with_jitter(mut self) -> Self {
189 self.builder = self.builder.with_jitter();
190 self
191 }
192
193 pub fn with_factor(mut self, factor: f32) -> Self {
199 self.builder = self.builder.with_factor(factor);
200 self
201 }
202
203 pub fn with_min_delay(mut self, min_delay: Duration) -> Self {
205 self.builder = self.builder.with_min_delay(min_delay);
206 self
207 }
208
209 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
213 self.builder = self.builder.with_max_delay(max_delay);
214 self
215 }
216
217 pub fn with_max_times(mut self, max_times: usize) -> Self {
221 self.builder = self.builder.with_max_times(max_times);
222 self
223 }
224}
225
226impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> {
227 type LayeredAccess = RetryAccessor<A, I>;
228
229 fn layer(&self, inner: A) -> Self::LayeredAccess {
230 RetryAccessor {
231 inner: Arc::new(inner),
232 builder: self.builder,
233 notify: self.notify.clone(),
234 }
235 }
236}
237
238pub trait RetryInterceptor: Send + Sync + 'static {
240 fn intercept(&self, err: &Error, dur: Duration);
256}
257
258impl<F> RetryInterceptor for F
259where
260 F: Fn(&Error, Duration) + Send + Sync + 'static,
261{
262 fn intercept(&self, err: &Error, dur: Duration) {
263 self(err, dur);
264 }
265}
266
267pub struct DefaultRetryInterceptor;
269
270impl RetryInterceptor for DefaultRetryInterceptor {
271 fn intercept(&self, err: &Error, dur: Duration) {
272 warn!(
273 target: "opendal::layers::retry",
274 "will retry after {}s because: {}",
275 dur.as_secs_f64(), err)
276 }
277}
278
279pub struct RetryAccessor<A: Access, I: RetryInterceptor> {
280 inner: Arc<A>,
281 builder: ExponentialBuilder,
282 notify: Arc<I>,
283}
284
285impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> {
286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct("RetryAccessor")
288 .field("inner", &self.inner)
289 .finish_non_exhaustive()
290 }
291}
292
293impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
294 type Inner = A;
295 type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>;
296 type BlockingReader = RetryWrapper<RetryReader<A, A::BlockingReader>, I>;
297 type Writer = RetryWrapper<A::Writer, I>;
298 type BlockingWriter = RetryWrapper<A::BlockingWriter, I>;
299 type Lister = RetryWrapper<A::Lister, I>;
300 type BlockingLister = RetryWrapper<A::BlockingLister, I>;
301 type Deleter = RetryWrapper<A::Deleter, I>;
302 type BlockingDeleter = RetryWrapper<A::BlockingDeleter, I>;
303
304 fn inner(&self) -> &Self::Inner {
305 &self.inner
306 }
307
308 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
309 { || self.inner.create_dir(path, args.clone()) }
310 .retry(self.builder)
311 .when(|e| e.is_temporary())
312 .notify(|err, dur: Duration| self.notify.intercept(err, dur))
313 .await
314 .map_err(|e| e.set_persistent())
315 }
316
317 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
318 let (rp, reader) = { || self.inner.read(path, args.clone()) }
319 .retry(self.builder)
320 .when(|e| e.is_temporary())
321 .notify(|err, dur| self.notify.intercept(err, dur))
322 .await
323 .map_err(|e| e.set_persistent())?;
324
325 let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
326 let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
327
328 Ok((rp, retry_wrapper))
329 }
330
331 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
332 { || self.inner.write(path, args.clone()) }
333 .retry(self.builder)
334 .when(|e| e.is_temporary())
335 .notify(|err, dur| self.notify.intercept(err, dur))
336 .await
337 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
338 .map_err(|e| e.set_persistent())
339 }
340
341 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
342 { || self.inner.stat(path, args.clone()) }
343 .retry(self.builder)
344 .when(|e| e.is_temporary())
345 .notify(|err, dur| self.notify.intercept(err, dur))
346 .await
347 .map_err(|e| e.set_persistent())
348 }
349
350 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
351 { || self.inner.delete() }
352 .retry(self.builder)
353 .when(|e| e.is_temporary())
354 .notify(|err, dur| self.notify.intercept(err, dur))
355 .await
356 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
357 .map_err(|e| e.set_persistent())
358 }
359
360 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
361 { || self.inner.copy(from, to, args.clone()) }
362 .retry(self.builder)
363 .when(|e| e.is_temporary())
364 .notify(|err, dur| self.notify.intercept(err, dur))
365 .await
366 .map_err(|e| e.set_persistent())
367 }
368
369 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
370 { || self.inner.rename(from, to, args.clone()) }
371 .retry(self.builder)
372 .when(|e| e.is_temporary())
373 .notify(|err, dur| self.notify.intercept(err, dur))
374 .await
375 .map_err(|e| e.set_persistent())
376 }
377
378 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
379 { || self.inner.list(path, args.clone()) }
380 .retry(self.builder)
381 .when(|e| e.is_temporary())
382 .notify(|err, dur| self.notify.intercept(err, dur))
383 .await
384 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
385 .map_err(|e| e.set_persistent())
386 }
387
388 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
389 { || self.inner.blocking_create_dir(path, args.clone()) }
390 .retry(self.builder)
391 .when(|e| e.is_temporary())
392 .notify(|err, dur| self.notify.intercept(err, dur))
393 .call()
394 .map_err(|e| e.set_persistent())
395 }
396
397 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
398 let (rp, reader) = { || self.inner.blocking_read(path, args.clone()) }
399 .retry(self.builder)
400 .when(|e| e.is_temporary())
401 .notify(|err, dur| self.notify.intercept(err, dur))
402 .call()
403 .map_err(|e| e.set_persistent())?;
404
405 let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader);
406 let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder);
407
408 Ok((rp, retry_wrapper))
409 }
410
411 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
412 { || self.inner.blocking_write(path, args.clone()) }
413 .retry(self.builder)
414 .when(|e| e.is_temporary())
415 .notify(|err, dur| self.notify.intercept(err, dur))
416 .call()
417 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
418 .map_err(|e| e.set_persistent())
419 }
420
421 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
422 { || self.inner.blocking_stat(path, args.clone()) }
423 .retry(self.builder)
424 .when(|e| e.is_temporary())
425 .notify(|err, dur| self.notify.intercept(err, dur))
426 .call()
427 .map_err(|e| e.set_persistent())
428 }
429
430 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
431 { || self.inner.blocking_delete() }
432 .retry(self.builder)
433 .when(|e| e.is_temporary())
434 .notify(|err, dur| self.notify.intercept(err, dur))
435 .call()
436 .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder)))
437 .map_err(|e| e.set_persistent())
438 }
439
440 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
441 { || self.inner.blocking_copy(from, to, args.clone()) }
442 .retry(self.builder)
443 .when(|e| e.is_temporary())
444 .notify(|err, dur| self.notify.intercept(err, dur))
445 .call()
446 .map_err(|e| e.set_persistent())
447 }
448
449 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
450 { || self.inner.blocking_rename(from, to, args.clone()) }
451 .retry(self.builder)
452 .when(|e| e.is_temporary())
453 .notify(|err, dur| self.notify.intercept(err, dur))
454 .call()
455 .map_err(|e| e.set_persistent())
456 }
457
458 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
459 { || self.inner.blocking_list(path, args.clone()) }
460 .retry(self.builder)
461 .when(|e| e.is_temporary())
462 .notify(|err, dur| self.notify.intercept(err, dur))
463 .call()
464 .map(|(rp, p)| {
465 let p = RetryWrapper::new(p, self.notify.clone(), self.builder);
466 (rp, p)
467 })
468 .map_err(|e| e.set_persistent())
469 }
470}
471
472pub struct RetryReader<A, R> {
473 inner: Arc<A>,
474 reader: Option<R>,
475
476 path: String,
477 args: OpRead,
478}
479
480impl<A, R> RetryReader<A, R> {
481 fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self {
482 Self {
483 inner,
484 reader: Some(r),
485
486 path,
487 args,
488 }
489 }
490}
491
492impl<A: Access> oio::Read for RetryReader<A, A::Reader> {
493 async fn read(&mut self) -> Result<Buffer> {
494 loop {
495 match self.reader.take() {
496 None => {
497 let (_, r) = self.inner.read(&self.path, self.args.clone()).await?;
498 self.reader = Some(r);
499 continue;
500 }
501 Some(mut reader) => {
502 let buf = reader.read().await?;
503 self.reader = Some(reader);
504 self.args.range_mut().advance(buf.len() as u64);
505 return Ok(buf);
506 }
507 }
508 }
509 }
510}
511
512impl<A: Access> oio::BlockingRead for RetryReader<A, A::BlockingReader> {
513 fn read(&mut self) -> Result<Buffer> {
514 loop {
515 match self.reader.take() {
516 None => {
517 let (_, r) = self.inner.blocking_read(&self.path, self.args.clone())?;
518 self.reader = Some(r);
519 continue;
520 }
521 Some(mut reader) => {
522 let buf = reader.read()?;
523 self.reader = Some(reader);
524 self.args.range_mut().advance(buf.len() as u64);
525 return Ok(buf);
526 }
527 }
528 }
529 }
530}
531
532pub struct RetryWrapper<R, I> {
533 inner: Option<R>,
534 notify: Arc<I>,
535
536 builder: ExponentialBuilder,
537}
538
539impl<R, I> RetryWrapper<R, I> {
540 fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self {
541 Self {
542 inner: Some(inner),
543 notify,
544 builder: backoff,
545 }
546 }
547
548 fn take_inner(&mut self) -> Result<R> {
549 self.inner.take().ok_or_else(|| {
550 Error::new(
551 ErrorKind::Unexpected,
552 "retry layer is in bad state, please make sure future not dropped before ready",
553 )
554 })
555 }
556}
557
558impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
559 async fn read(&mut self) -> Result<Buffer> {
560 use backon::RetryableWithContext;
561
562 let inner = self.take_inner()?;
563
564 let (inner, res) = {
565 |mut r: R| async move {
566 let res = r.read().await;
567
568 (r, res)
569 }
570 }
571 .retry(self.builder)
572 .when(|e| e.is_temporary())
573 .context(inner)
574 .notify(|err, dur| self.notify.intercept(err, dur))
575 .await;
576
577 self.inner = Some(inner);
578 res.map_err(|err| err.set_persistent())
579 }
580}
581
582impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapper<R, I> {
583 fn read(&mut self) -> Result<Buffer> {
584 use backon::BlockingRetryableWithContext;
585
586 let inner = self.take_inner()?;
587
588 let (inner, res) = {
589 |mut r: R| {
590 let res = r.read();
591
592 (r, res)
593 }
594 }
595 .retry(self.builder)
596 .when(|e| e.is_temporary())
597 .context(inner)
598 .notify(|err, dur| self.notify.intercept(err, dur))
599 .call();
600
601 self.inner = Some(inner);
602 res.map_err(|err| err.set_persistent())
603 }
604}
605
606impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
607 async fn write(&mut self, bs: Buffer) -> Result<()> {
608 use backon::RetryableWithContext;
609
610 let inner = self.take_inner()?;
611
612 let ((inner, _), res) = {
613 |(mut r, bs): (R, Buffer)| async move {
614 let res = r.write(bs.clone()).await;
615
616 ((r, bs), res)
617 }
618 }
619 .retry(self.builder)
620 .when(|e| e.is_temporary())
621 .context((inner, bs))
622 .notify(|err, dur| self.notify.intercept(err, dur))
623 .await;
624
625 self.inner = Some(inner);
626 res.map_err(|err| err.set_persistent())
627 }
628
629 async fn abort(&mut self) -> Result<()> {
630 use backon::RetryableWithContext;
631
632 let inner = self.take_inner()?;
633
634 let (inner, res) = {
635 |mut r: R| async move {
636 let res = r.abort().await;
637
638 (r, res)
639 }
640 }
641 .retry(self.builder)
642 .when(|e| e.is_temporary())
643 .context(inner)
644 .notify(|err, dur| self.notify.intercept(err, dur))
645 .await;
646
647 self.inner = Some(inner);
648 res.map_err(|err| err.set_persistent())
649 }
650
651 async fn close(&mut self) -> Result<Metadata> {
652 use backon::RetryableWithContext;
653
654 let inner = self.take_inner()?;
655
656 let (inner, res) = {
657 |mut r: R| async move {
658 let res = r.close().await;
659
660 (r, res)
661 }
662 }
663 .retry(self.builder)
664 .when(|e| e.is_temporary())
665 .context(inner)
666 .notify(|err, dur| self.notify.intercept(err, dur))
667 .await;
668
669 self.inner = Some(inner);
670 res.map_err(|err| err.set_persistent())
671 }
672}
673
674impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> {
675 fn write(&mut self, bs: Buffer) -> Result<()> {
676 { || self.inner.as_mut().unwrap().write(bs.clone()) }
677 .retry(self.builder)
678 .when(|e| e.is_temporary())
679 .notify(|err, dur| {
680 self.notify.intercept(err, dur);
681 })
682 .call()
683 .map_err(|e| e.set_persistent())
684 }
685
686 fn close(&mut self) -> Result<Metadata> {
687 { || self.inner.as_mut().unwrap().close() }
688 .retry(self.builder)
689 .when(|e| e.is_temporary())
690 .notify(|err, dur| {
691 self.notify.intercept(err, dur);
692 })
693 .call()
694 .map_err(|e| e.set_persistent())
695 }
696}
697
698impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
699 async fn next(&mut self) -> Result<Option<oio::Entry>> {
700 use backon::RetryableWithContext;
701
702 let inner = self.take_inner()?;
703
704 let (inner, res) = {
705 |mut p: P| async move {
706 let res = p.next().await;
707
708 (p, res)
709 }
710 }
711 .retry(self.builder)
712 .when(|e| e.is_temporary())
713 .context(inner)
714 .notify(|err, dur| self.notify.intercept(err, dur))
715 .await;
716
717 self.inner = Some(inner);
718 res.map_err(|err| err.set_persistent())
719 }
720}
721
722impl<P: oio::BlockingList, I: RetryInterceptor> oio::BlockingList for RetryWrapper<P, I> {
723 fn next(&mut self) -> Result<Option<oio::Entry>> {
724 { || self.inner.as_mut().unwrap().next() }
725 .retry(self.builder)
726 .when(|e| e.is_temporary())
727 .notify(|err, dur| {
728 self.notify.intercept(err, dur);
729 })
730 .call()
731 .map_err(|e| e.set_persistent())
732 }
733}
734
735impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> {
736 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
737 { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
738 .retry(self.builder)
739 .when(|e| e.is_temporary())
740 .notify(|err, dur| {
741 self.notify.intercept(err, dur);
742 })
743 .call()
744 .map_err(|e| e.set_persistent())
745 }
746
747 async fn flush(&mut self) -> Result<usize> {
748 use backon::RetryableWithContext;
749
750 let inner = self.take_inner()?;
751
752 let (inner, res) = {
753 |mut p: P| async move {
754 let res = p.flush().await;
755
756 (p, res)
757 }
758 }
759 .retry(self.builder)
760 .when(|e| e.is_temporary())
761 .context(inner)
762 .notify(|err, dur| self.notify.intercept(err, dur))
763 .await;
764
765 self.inner = Some(inner);
766 res.map_err(|err| err.set_persistent())
767 }
768}
769
770impl<P: oio::BlockingDelete, I: RetryInterceptor> oio::BlockingDelete for RetryWrapper<P, I> {
771 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
772 { || self.inner.as_mut().unwrap().delete(path, args.clone()) }
773 .retry(self.builder)
774 .when(|e| e.is_temporary())
775 .notify(|err, dur| {
776 self.notify.intercept(err, dur);
777 })
778 .call()
779 .map_err(|e| e.set_persistent())
780 }
781
782 fn flush(&mut self) -> Result<usize> {
783 { || self.inner.as_mut().unwrap().flush() }
784 .retry(self.builder)
785 .when(|e| e.is_temporary())
786 .notify(|err, dur| {
787 self.notify.intercept(err, dur);
788 })
789 .call()
790 .map_err(|e| e.set_persistent())
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use std::mem;
797 use std::sync::Arc;
798 use std::sync::Mutex;
799
800 use bytes::Bytes;
801 use futures::{stream, TryStreamExt};
802 use tracing_subscriber::filter::LevelFilter;
803
804 use super::*;
805 use crate::layers::LoggingLayer;
806
807 #[derive(Default, Clone)]
808 struct MockBuilder {
809 attempt: Arc<Mutex<usize>>,
810 }
811
812 impl Builder for MockBuilder {
813 const SCHEME: Scheme = Scheme::Custom("mock");
814 type Config = ();
815
816 fn build(self) -> Result<impl Access> {
817 Ok(MockService {
818 attempt: self.attempt.clone(),
819 })
820 }
821 }
822
823 #[derive(Debug, Clone, Default)]
824 struct MockService {
825 attempt: Arc<Mutex<usize>>,
826 }
827
828 impl Access for MockService {
829 type Reader = MockReader;
830 type Writer = MockWriter;
831 type Lister = MockLister;
832 type Deleter = MockDeleter;
833 type BlockingReader = ();
834 type BlockingWriter = ();
835 type BlockingLister = ();
836 type BlockingDeleter = ();
837
838 fn info(&self) -> Arc<AccessorInfo> {
839 let am = AccessorInfo::default();
840 am.set_scheme(Scheme::Custom("mock"))
841 .set_native_capability(Capability {
842 read: true,
843 write: true,
844 write_can_multi: true,
845 delete: true,
846 delete_max_size: Some(10),
847 stat: true,
848 list: true,
849 list_with_recursive: true,
850 ..Default::default()
851 });
852
853 am.into()
854 }
855
856 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
857 Ok(RpStat::new(
858 Metadata::new(EntryMode::FILE).with_content_length(13),
859 ))
860 }
861
862 async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
863 Ok((
864 RpRead::new(),
865 MockReader {
866 buf: Bytes::from("Hello, World!").into(),
867 range: args.range(),
868 attempt: self.attempt.clone(),
869 },
870 ))
871 }
872
873 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
874 Ok((
875 RpDelete::default(),
876 MockDeleter {
877 size: 0,
878 attempt: self.attempt.clone(),
879 },
880 ))
881 }
882
883 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
884 Ok((RpWrite::new(), MockWriter {}))
885 }
886
887 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
888 let lister = MockLister::default();
889 Ok((RpList::default(), lister))
890 }
891 }
892
893 #[derive(Debug, Clone, Default)]
894 struct MockReader {
895 buf: Buffer,
896 range: BytesRange,
897 attempt: Arc<Mutex<usize>>,
898 }
899
900 impl oio::Read for MockReader {
901 async fn read(&mut self) -> Result<Buffer> {
902 let mut attempt = self.attempt.lock().unwrap();
903 *attempt += 1;
904
905 match *attempt {
906 1 => Err(
907 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
908 .set_temporary(),
909 ),
910 2 => Err(
911 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
912 .set_temporary(),
913 ),
914 3 => Ok(self.buf.slice(self.range.to_range_as_usize())),
916 4 => Err(
917 Error::new(ErrorKind::Unexpected, "retryable_error from reader")
918 .set_temporary(),
919 ),
920 5 => Ok(self.buf.slice(self.range.to_range_as_usize())),
922 _ => unreachable!(),
923 }
924 }
925 }
926
927 #[derive(Debug, Clone, Default)]
928 struct MockWriter {}
929
930 impl oio::Write for MockWriter {
931 async fn write(&mut self, _: Buffer) -> Result<()> {
932 Ok(())
933 }
934
935 async fn close(&mut self) -> Result<Metadata> {
936 Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary())
937 }
938
939 async fn abort(&mut self) -> Result<()> {
940 Ok(())
941 }
942 }
943
944 #[derive(Debug, Clone, Default)]
945 struct MockLister {
946 attempt: usize,
947 }
948
949 impl oio::List for MockLister {
950 async fn next(&mut self) -> Result<Option<oio::Entry>> {
951 self.attempt += 1;
952 match self.attempt {
953 1 => Err(Error::new(
954 ErrorKind::RateLimited,
955 "retryable rate limited error from lister",
956 )
957 .set_temporary()),
958 2 => Ok(Some(oio::Entry::new(
959 "hello",
960 Metadata::new(EntryMode::FILE),
961 ))),
962 3 => Ok(Some(oio::Entry::new(
963 "world",
964 Metadata::new(EntryMode::FILE),
965 ))),
966 4 => Err(
967 Error::new(ErrorKind::Unexpected, "retryable internal server error")
968 .set_temporary(),
969 ),
970 5 => Ok(Some(oio::Entry::new(
971 "2023/",
972 Metadata::new(EntryMode::DIR),
973 ))),
974 6 => Ok(Some(oio::Entry::new(
975 "0208/",
976 Metadata::new(EntryMode::DIR),
977 ))),
978 7 => Ok(None),
979 _ => {
980 unreachable!()
981 }
982 }
983 }
984 }
985
986 #[derive(Debug, Clone, Default)]
987 struct MockDeleter {
988 size: usize,
989 attempt: Arc<Mutex<usize>>,
990 }
991
992 impl oio::Delete for MockDeleter {
993 fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
994 self.size += 1;
995 Ok(())
996 }
997
998 async fn flush(&mut self) -> Result<usize> {
999 let mut attempt = self.attempt.lock().unwrap();
1000 *attempt += 1;
1001
1002 match *attempt {
1003 1 => Err(
1004 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
1005 .set_temporary(),
1006 ),
1007 2 => {
1008 self.size -= 1;
1009 Ok(1)
1010 }
1011 3 => Err(
1012 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
1013 .set_temporary(),
1014 ),
1015 4 => Err(
1016 Error::new(ErrorKind::Unexpected, "retryable_error from deleter")
1017 .set_temporary(),
1018 ),
1019 5 => {
1020 let s = mem::take(&mut self.size);
1021 Ok(s)
1022 }
1023 _ => unreachable!(),
1024 }
1025 }
1026 }
1027
1028 #[tokio::test]
1029 async fn test_retry_read() {
1030 let _ = tracing_subscriber::fmt()
1031 .with_max_level(LevelFilter::TRACE)
1032 .with_test_writer()
1033 .try_init();
1034
1035 let builder = MockBuilder::default();
1036 let op = Operator::new(builder.clone())
1037 .unwrap()
1038 .layer(LoggingLayer::default())
1039 .layer(RetryLayer::new())
1040 .finish();
1041
1042 let r = op.reader("retryable_error").await.unwrap();
1043 let mut content = Vec::new();
1044 let size = r
1045 .read_into(&mut content, ..)
1046 .await
1047 .expect("read must succeed");
1048 assert_eq!(size, 13);
1049 assert_eq!(content, "Hello, World!".as_bytes());
1050 assert_eq!(*builder.attempt.lock().unwrap(), 5);
1052 }
1053
1054 #[tokio::test]
1056 async fn test_retry_write_fail_on_close() {
1057 let _ = tracing_subscriber::fmt()
1058 .with_max_level(LevelFilter::TRACE)
1059 .with_test_writer()
1060 .try_init();
1061
1062 let builder = MockBuilder::default();
1063 let op = Operator::new(builder.clone())
1064 .unwrap()
1065 .layer(
1066 RetryLayer::new()
1067 .with_min_delay(Duration::from_millis(1))
1068 .with_max_delay(Duration::from_millis(1))
1069 .with_jitter(),
1070 )
1071 .layer(LoggingLayer::default())
1074 .finish();
1075
1076 let mut w = op.writer("test_write").await.unwrap();
1077 w.write("aaa").await.unwrap();
1078 w.write("bbb").await.unwrap();
1079 match w.close().await {
1080 Ok(_) => (),
1081 Err(_) => {
1082 w.abort().await.unwrap();
1083 }
1084 };
1085 }
1086
1087 #[tokio::test]
1088 async fn test_retry_list() {
1089 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1090
1091 let builder = MockBuilder::default();
1092 let op = Operator::new(builder.clone())
1093 .unwrap()
1094 .layer(RetryLayer::new())
1095 .finish();
1096
1097 let expected = vec!["hello", "world", "2023/", "0208/"];
1098
1099 let mut lister = op
1100 .lister("retryable_error/")
1101 .await
1102 .expect("service must support list");
1103 let mut actual = Vec::new();
1104 while let Some(obj) = lister.try_next().await.expect("must success") {
1105 actual.push(obj.name().to_owned());
1106 }
1107
1108 assert_eq!(actual, expected);
1109 }
1110
1111 #[tokio::test]
1112 async fn test_retry_batch() {
1113 let _ = tracing_subscriber::fmt().with_test_writer().try_init();
1114
1115 let builder = MockBuilder::default();
1116 let op = Operator::new(builder.clone())
1118 .unwrap()
1119 .layer(
1120 RetryLayer::new()
1121 .with_min_delay(Duration::from_secs_f32(0.1))
1122 .with_max_times(5),
1123 )
1124 .finish();
1125
1126 let paths = vec!["hello", "world", "test", "batch"];
1127 op.delete_stream(stream::iter(paths)).await.unwrap();
1128 assert_eq!(*builder.attempt.lock().unwrap(), 5);
1129 }
1130}