1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::time::Duration;
23use std::time::Instant;
24
25use crate::raw::*;
26use crate::*;
27
28#[derive(Clone)]
55pub struct TailCutLayerBuilder {
56 percentile: u8,
57 safety_factor: f64,
58 window: Duration,
59 min_samples: usize,
60 min_deadline: Duration,
61 max_deadline: Duration,
62}
63
64impl Default for TailCutLayerBuilder {
65 fn default() -> Self {
66 Self {
67 percentile: 95,
68 safety_factor: 1.3,
69 window: Duration::from_secs(60),
70 min_samples: 200,
71 min_deadline: Duration::from_millis(500),
72 max_deadline: Duration::from_secs(30),
73 }
74 }
75}
76
77impl TailCutLayerBuilder {
78 pub fn new() -> Self {
80 Self::default()
81 }
82
83 pub fn percentile(mut self, percentile: u8) -> Self {
93 assert!(
94 (50..=99).contains(&percentile),
95 "percentile must be between 50 and 99"
96 );
97 self.percentile = percentile;
98 self
99 }
100
101 pub fn safety_factor(mut self, factor: f64) -> Self {
112 assert!(
113 (1.0..=5.0).contains(&factor),
114 "safety_factor must be between 1.0 and 5.0"
115 );
116 self.safety_factor = factor;
117 self
118 }
119
120 pub fn window(mut self, window: Duration) -> Self {
131 assert!(
132 window <= Duration::from_secs(120),
133 "window must be <= 120 seconds"
134 );
135 self.window = window;
136 self
137 }
138
139 pub fn min_samples(mut self, min_samples: usize) -> Self {
146 self.min_samples = min_samples;
147 self
148 }
149
150 pub fn min_deadline(mut self, deadline: Duration) -> Self {
157 self.min_deadline = deadline;
158 self
159 }
160
161 pub fn max_deadline(mut self, deadline: Duration) -> Self {
168 self.max_deadline = deadline;
169 self
170 }
171
172 pub fn build(self) -> TailCutLayer {
204 TailCutLayer {
205 config: Arc::new(TailCutConfig {
206 percentile: self.percentile,
207 safety_factor: self.safety_factor,
208 window: self.window,
209 min_samples: self.min_samples,
210 min_deadline: self.min_deadline,
211 max_deadline: self.max_deadline,
212 }),
213 stats: Arc::new(TailCutStats::new()),
214 }
215 }
216}
217
218#[derive(Debug)]
220struct TailCutConfig {
221 percentile: u8,
222 safety_factor: f64,
223 window: Duration,
224 min_samples: usize,
225 min_deadline: Duration,
226 max_deadline: Duration,
227}
228
229#[derive(Clone)]
260pub struct TailCutLayer {
261 config: Arc<TailCutConfig>,
262 stats: Arc<TailCutStats>,
263}
264
265impl TailCutLayer {
266 pub fn builder() -> TailCutLayerBuilder {
268 TailCutLayerBuilder::new()
269 }
270
271 pub fn new() -> Self {
275 Self::builder().build()
276 }
277}
278
279impl Default for TailCutLayer {
280 fn default() -> Self {
281 Self::new()
282 }
283}
284
285impl<A: Access> Layer<A> for TailCutLayer {
286 type LayeredAccess = TailCutAccessor<A>;
287
288 fn layer(&self, inner: A) -> Self::LayeredAccess {
289 TailCutAccessor {
290 inner,
291 config: self.config.clone(),
292 stats: self.stats.clone(),
293 }
294 }
295}
296
297#[derive(Clone)]
299pub struct TailCutAccessor<A: Access> {
300 inner: A,
301 config: Arc<TailCutConfig>,
302 stats: Arc<TailCutStats>,
303}
304
305impl<A: Access> Debug for TailCutAccessor<A> {
306 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307 f.debug_struct("TailCutAccessor")
308 .field("config", &self.config)
309 .finish_non_exhaustive()
310 }
311}
312
313impl<A: Access> TailCutAccessor<A> {
314 fn calculate_deadline(&self, op: Operation, size: Option<u64>) -> Option<Duration> {
316 let op_stats = self.stats.stats_for(op);
317
318 if op_stats.total_samples(size, self.config.window) < self.config.min_samples {
319 return None;
320 }
321
322 let q = self.config.percentile as f64 / 100.0;
323 let pctl = op_stats.quantile(size, q, self.config.window)?;
324
325 let deadline = Duration::from_secs_f64(pctl.as_secs_f64() * self.config.safety_factor);
326 Some(deadline.clamp(self.config.min_deadline, self.config.max_deadline))
327 }
328
329 async fn with_deadline<F, T>(&self, op: Operation, size: Option<u64>, fut: F) -> Result<T>
330 where
331 F: std::future::Future<Output = Result<T>>,
332 {
333 let start = Instant::now();
334
335 let result = if let Some(deadline) = self.calculate_deadline(op, size) {
336 match tokio::time::timeout(deadline, fut).await {
337 Ok(res) => res,
338 Err(_) => Err(Error::new(ErrorKind::Unexpected, "cancelled by tail cut")
339 .with_operation(op)
340 .with_context("percentile", format!("P{}", self.config.percentile))
341 .with_context("deadline", format!("{:?}", deadline))
342 .set_temporary()),
343 }
344 } else {
345 fut.await
346 };
347
348 if result.is_ok() {
349 let latency = start.elapsed();
350 self.stats.stats_for(op).record(size, latency);
351 }
352
353 result
354 }
355}
356
357impl<A: Access> LayeredAccess for TailCutAccessor<A> {
358 type Inner = A;
359 type Reader = TailCutWrapper<A::Reader>;
360 type Writer = TailCutWrapper<A::Writer>;
361 type Lister = TailCutWrapper<A::Lister>;
362 type Deleter = TailCutWrapper<A::Deleter>;
363
364 fn inner(&self) -> &Self::Inner {
365 &self.inner
366 }
367
368 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
369 self.with_deadline(
370 Operation::CreateDir,
371 None,
372 self.inner.create_dir(path, args),
373 )
374 .await
375 }
376
377 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
378 let size = args.range().size();
379 self.with_deadline(Operation::Read, size, self.inner.read(path, args))
380 .await
381 .map(|(rp, r)| {
382 (
383 rp,
384 TailCutWrapper::new(r, size, self.config.clone(), self.stats.clone()),
385 )
386 })
387 }
388
389 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
390 self.with_deadline(Operation::Write, None, self.inner.write(path, args))
391 .await
392 .map(|(rp, w)| {
393 (
394 rp,
395 TailCutWrapper::new(w, None, self.config.clone(), self.stats.clone()),
396 )
397 })
398 }
399
400 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
401 self.with_deadline(Operation::Copy, None, self.inner.copy(from, to, args))
402 .await
403 }
404
405 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
406 self.with_deadline(Operation::Rename, None, self.inner.rename(from, to, args))
407 .await
408 }
409
410 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
411 self.with_deadline(Operation::Stat, None, self.inner.stat(path, args))
412 .await
413 }
414
415 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
416 self.with_deadline(Operation::Delete, None, self.inner.delete())
417 .await
418 .map(|(rp, d)| {
419 (
420 rp,
421 TailCutWrapper::new(d, None, self.config.clone(), self.stats.clone()),
422 )
423 })
424 }
425
426 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
427 self.with_deadline(Operation::List, None, self.inner.list(path, args))
428 .await
429 .map(|(rp, l)| {
430 (
431 rp,
432 TailCutWrapper::new(l, None, self.config.clone(), self.stats.clone()),
433 )
434 })
435 }
436
437 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
438 self.with_deadline(Operation::Presign, None, self.inner.presign(path, args))
439 .await
440 }
441}
442
443pub struct TailCutWrapper<R> {
445 inner: R,
446 size: Option<u64>,
447 config: Arc<TailCutConfig>,
448 stats: Arc<TailCutStats>,
449}
450
451impl<R> TailCutWrapper<R> {
452 fn new(
453 inner: R,
454 size: Option<u64>,
455 config: Arc<TailCutConfig>,
456 stats: Arc<TailCutStats>,
457 ) -> Self {
458 Self {
459 inner,
460 size,
461 config,
462 stats,
463 }
464 }
465
466 fn calculate_deadline(&self, op: Operation) -> Option<Duration> {
467 let op_stats = self.stats.stats_for(op);
468
469 if op_stats.total_samples(self.size, self.config.window) < self.config.min_samples {
470 return None;
471 }
472
473 let q = self.config.percentile as f64 / 100.0;
474 let pctl = op_stats.quantile(self.size, q, self.config.window)?;
475
476 let deadline = Duration::from_secs_f64(pctl.as_secs_f64() * self.config.safety_factor);
477 Some(deadline.clamp(self.config.min_deadline, self.config.max_deadline))
478 }
479
480 #[inline]
481 async fn with_io_deadline<F, T>(
482 deadline: Option<Duration>,
483 percentile: u8,
484 stats: &Arc<TailCutStats>,
485 size: Option<u64>,
486 op: Operation,
487 fut: F,
488 ) -> Result<T>
489 where
490 F: std::future::Future<Output = Result<T>>,
491 {
492 let start = Instant::now();
493
494 let result = if let Some(dl) = deadline {
495 match tokio::time::timeout(dl, fut).await {
496 Ok(res) => res,
497 Err(_) => Err(
498 Error::new(ErrorKind::Unexpected, "io cancelled by tail cut")
499 .with_operation(op)
500 .with_context("percentile", format!("P{}", percentile))
501 .with_context("deadline", format!("{:?}", dl))
502 .set_temporary(),
503 ),
504 }
505 } else {
506 fut.await
507 };
508
509 if result.is_ok() {
510 let latency = start.elapsed();
511 stats.stats_for(op).record(size, latency);
512 }
513
514 result
515 }
516}
517
518impl<R: oio::Read> oio::Read for TailCutWrapper<R> {
519 async fn read(&mut self) -> Result<Buffer> {
520 let deadline = self.calculate_deadline(Operation::Read);
521 Self::with_io_deadline(
522 deadline,
523 self.config.percentile,
524 &self.stats,
525 self.size,
526 Operation::Read,
527 self.inner.read(),
528 )
529 .await
530 }
531}
532
533impl<R: oio::Write> oio::Write for TailCutWrapper<R> {
534 async fn write(&mut self, bs: Buffer) -> Result<()> {
535 let deadline = self.calculate_deadline(Operation::Write);
536 Self::with_io_deadline(
537 deadline,
538 self.config.percentile,
539 &self.stats,
540 self.size,
541 Operation::Write,
542 self.inner.write(bs),
543 )
544 .await
545 }
546
547 async fn close(&mut self) -> Result<Metadata> {
548 let deadline = self.calculate_deadline(Operation::Write);
549 Self::with_io_deadline(
550 deadline,
551 self.config.percentile,
552 &self.stats,
553 self.size,
554 Operation::Write,
555 self.inner.close(),
556 )
557 .await
558 }
559
560 async fn abort(&mut self) -> Result<()> {
561 let deadline = self.calculate_deadline(Operation::Write);
562 Self::with_io_deadline(
563 deadline,
564 self.config.percentile,
565 &self.stats,
566 self.size,
567 Operation::Write,
568 self.inner.abort(),
569 )
570 .await
571 }
572}
573
574impl<R: oio::List> oio::List for TailCutWrapper<R> {
575 async fn next(&mut self) -> Result<Option<oio::Entry>> {
576 let deadline = self.calculate_deadline(Operation::List);
577 Self::with_io_deadline(
578 deadline,
579 self.config.percentile,
580 &self.stats,
581 self.size,
582 Operation::List,
583 self.inner.next(),
584 )
585 .await
586 }
587}
588
589impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
590 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
591 self.inner.delete(path, args)
592 }
593
594 async fn flush(&mut self) -> Result<usize> {
595 let deadline = self.calculate_deadline(Operation::Delete);
596 Self::with_io_deadline(
597 deadline,
598 self.config.percentile,
599 &self.stats,
600 self.size,
601 Operation::Delete,
602 self.inner.flush(),
603 )
604 .await
605 }
606}
607
608struct TailCutStats {
610 operations: [Arc<OperationStats>; 7],
612}
613
614impl TailCutStats {
615 fn new() -> Self {
616 Self {
617 operations: std::array::from_fn(|_| Arc::new(OperationStats::new())),
618 }
619 }
620
621 fn stats_for(&self, op: Operation) -> &Arc<OperationStats> {
622 let idx = match op {
623 Operation::Read => 0,
624 Operation::Write => 1,
625 Operation::Stat => 2,
626 Operation::List => 3,
627 Operation::Delete => 4,
628 Operation::Copy => 5,
629 Operation::Rename => 6,
630 _ => 2, };
632 &self.operations[idx]
633 }
634}
635
636struct OperationStats {
638 buckets: Vec<SizeBucket>,
639}
640
641impl OperationStats {
642 fn new() -> Self {
643 Self {
644 buckets: vec![
645 SizeBucket::new(0, Some(4 * 1024)), SizeBucket::new(4 * 1024, Some(64 * 1024)), SizeBucket::new(64 * 1024, Some(1024 * 1024)), SizeBucket::new(1024 * 1024, Some(16 * 1024 * 1024)), SizeBucket::new(16 * 1024 * 1024, Some(256 * 1024 * 1024)), SizeBucket::new(256 * 1024 * 1024, None), ],
652 }
653 }
654
655 fn bucket_for(&self, size: Option<u64>) -> &SizeBucket {
656 let size = size.unwrap_or(u64::MAX);
657
658 self.buckets
659 .iter()
660 .find(|b| b.contains(size))
661 .unwrap_or(&self.buckets[self.buckets.len() - 1])
662 }
663
664 fn record(&self, size: Option<u64>, latency: Duration) {
665 self.bucket_for(size).histogram.record(latency);
666 }
667
668 fn quantile(&self, size: Option<u64>, q: f64, window: Duration) -> Option<Duration> {
669 self.bucket_for(size).histogram.quantile(q, window)
670 }
671
672 fn total_samples(&self, size: Option<u64>, window: Duration) -> usize {
673 self.bucket_for(size).histogram.total_samples(window)
674 }
675}
676
677struct SizeBucket {
679 min_size: u64,
680 max_size: Option<u64>,
681 histogram: WindowedHistogram,
682}
683
684impl SizeBucket {
685 fn new(min_size: u64, max_size: Option<u64>) -> Self {
686 Self {
687 min_size,
688 max_size,
689 histogram: WindowedHistogram::new(),
690 }
691 }
692
693 fn contains(&self, size: u64) -> bool {
694 size >= self.min_size && self.max_size.is_none_or(|max| size < max)
695 }
696}
697
698const SLICE_DURATION_MS: u64 = 10_000; const NUM_SLICES: usize = 12; const NUM_BUCKETS: usize = 17; struct WindowedHistogram {
704 slices: Box<[TimeSlice; NUM_SLICES]>,
705 current_idx: AtomicUsize,
706 last_rotate: AtomicU64,
707}
708
709impl WindowedHistogram {
710 fn new() -> Self {
711 Self {
712 slices: Box::new(std::array::from_fn(|_| TimeSlice::new())),
713 current_idx: AtomicUsize::new(0),
714 last_rotate: AtomicU64::new(Self::now_ms()),
715 }
716 }
717
718 fn record(&self, latency: Duration) {
719 self.maybe_rotate();
720
721 let bucket_idx = Self::latency_to_bucket(latency);
722 let slice_idx = self.current_idx.load(Ordering::Relaxed);
723
724 self.slices[slice_idx].buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
725 }
726
727 fn quantile(&self, q: f64, window: Duration) -> Option<Duration> {
728 debug_assert!((0.0..=1.0).contains(&q), "quantile must be in [0, 1]");
729
730 let snapshot = self.snapshot(window);
731 let total: u64 = snapshot.iter().sum();
732
733 if total == 0 {
734 return None;
735 }
736
737 let target = (total as f64 * q).ceil() as u64;
738 let mut cumsum = 0u64;
739
740 for (bucket_idx, &count) in snapshot.iter().enumerate() {
741 cumsum += count;
742 if cumsum >= target {
743 return Some(Self::bucket_to_latency(bucket_idx));
744 }
745 }
746
747 Some(Self::bucket_to_latency(NUM_BUCKETS - 1))
748 }
749
750 fn total_samples(&self, window: Duration) -> usize {
751 self.snapshot(window).iter().map(|&v| v as usize).sum()
752 }
753
754 fn snapshot(&self, window: Duration) -> [u64; NUM_BUCKETS] {
755 let mut result = [0u64; NUM_BUCKETS];
756 let now_ms = Self::now_ms();
757 let window_ms = window.as_millis() as u64;
758
759 for slice in self.slices.iter() {
760 let start = slice.start_epoch_ms.load(Ordering::Acquire);
761
762 if start > 0 && now_ms.saturating_sub(start) < window_ms + SLICE_DURATION_MS {
763 for (i, bucket) in slice.buckets.iter().enumerate() {
764 result[i] += bucket.load(Ordering::Relaxed);
765 }
766 }
767 }
768
769 result
770 }
771
772 fn maybe_rotate(&self) {
773 let now = Self::now_ms();
774 let last_rotate = self.last_rotate.load(Ordering::Relaxed);
775
776 if now - last_rotate >= SLICE_DURATION_MS
777 && self
778 .last_rotate
779 .compare_exchange(last_rotate, now, Ordering::Release, Ordering::Relaxed)
780 .is_ok()
781 {
782 let old_idx = self.current_idx.load(Ordering::Relaxed);
783 let new_idx = (old_idx + 1) % NUM_SLICES;
784
785 let new_slice = &self.slices[new_idx];
786 new_slice.start_epoch_ms.store(now, Ordering::Release);
787 for bucket in &new_slice.buckets {
788 bucket.store(0, Ordering::Relaxed);
789 }
790
791 self.current_idx.store(new_idx, Ordering::Release);
792 }
793 }
794
795 fn latency_to_bucket(latency: Duration) -> usize {
796 let ms = latency.as_millis() as u64;
797
798 if ms == 0 {
799 return 0;
800 }
801
802 let bucket = 64 - ms.leading_zeros();
803 (bucket as usize).min(NUM_BUCKETS - 1)
804 }
805
806 fn bucket_to_latency(bucket_idx: usize) -> Duration {
807 if bucket_idx == 0 {
808 Duration::from_millis(1)
809 } else if bucket_idx >= NUM_BUCKETS - 1 {
810 Duration::from_secs(64)
811 } else {
812 Duration::from_millis(1u64 << bucket_idx)
813 }
814 }
815
816 fn now_ms() -> u64 {
817 std::time::SystemTime::now()
818 .duration_since(std::time::UNIX_EPOCH)
819 .unwrap()
820 .as_millis() as u64
821 }
822}
823
824struct TimeSlice {
826 buckets: [AtomicU64; NUM_BUCKETS],
828 start_epoch_ms: AtomicU64,
829}
830
831impl TimeSlice {
832 fn new() -> Self {
833 Self {
834 buckets: std::array::from_fn(|_| AtomicU64::new(0)),
835 start_epoch_ms: AtomicU64::new(0),
836 }
837 }
838}
839
840#[cfg(test)]
841mod tests {
842 use super::*;
843
844 #[test]
845 fn test_latency_to_bucket() {
846 assert_eq!(
847 WindowedHistogram::latency_to_bucket(Duration::from_millis(0)),
848 0
849 );
850 assert_eq!(
851 WindowedHistogram::latency_to_bucket(Duration::from_millis(1)),
852 1
853 );
854 assert_eq!(
855 WindowedHistogram::latency_to_bucket(Duration::from_millis(2)),
856 2
857 );
858 assert_eq!(
859 WindowedHistogram::latency_to_bucket(Duration::from_millis(4)),
860 3
861 );
862 assert_eq!(
863 WindowedHistogram::latency_to_bucket(Duration::from_millis(8)),
864 4
865 );
866 assert_eq!(
867 WindowedHistogram::latency_to_bucket(Duration::from_millis(500)),
868 9
869 );
870 assert_eq!(
871 WindowedHistogram::latency_to_bucket(Duration::from_secs(1)),
872 10
873 );
874 assert_eq!(
875 WindowedHistogram::latency_to_bucket(Duration::from_secs(2)),
876 11
877 );
878 assert_eq!(
879 WindowedHistogram::latency_to_bucket(Duration::from_secs(64)),
880 16
881 );
882 assert_eq!(
883 WindowedHistogram::latency_to_bucket(Duration::from_secs(1000)),
884 16
885 );
886 }
887
888 #[test]
889 fn test_size_bucket_contains() {
890 let bucket = SizeBucket::new(0, Some(4096));
891 assert!(bucket.contains(0));
892 assert!(bucket.contains(4095));
893 assert!(!bucket.contains(4096));
894
895 let bucket = SizeBucket::new(4096, None);
896 assert!(!bucket.contains(4095));
897 assert!(bucket.contains(4096));
898 assert!(bucket.contains(u64::MAX));
899 }
900
901 #[tokio::test]
902 async fn test_histogram_basic() {
903 let hist = WindowedHistogram::new();
904 let now = WindowedHistogram::now_ms();
905 hist.slices[0].start_epoch_ms.store(now, Ordering::Release);
906
907 hist.record(Duration::from_millis(10));
908 hist.record(Duration::from_millis(20));
909 hist.record(Duration::from_millis(30));
910
911 let samples = hist.total_samples(Duration::from_secs(60));
912 assert_eq!(samples, 3);
913
914 let p50 = hist.quantile(0.5, Duration::from_secs(60));
915 assert!(p50.is_some());
916 }
917
918 #[tokio::test]
919 async fn test_tail_cut_layer_build() {
920 let layer = TailCutLayer::builder()
921 .percentile(95)
922 .safety_factor(1.5)
923 .window(Duration::from_secs(60))
924 .min_samples(100)
925 .min_deadline(Duration::from_millis(200))
926 .max_deadline(Duration::from_secs(20))
927 .build();
928
929 assert_eq!(layer.config.percentile, 95);
930 assert_eq!(layer.config.safety_factor, 1.5);
931 assert_eq!(layer.config.window, Duration::from_secs(60));
932 assert_eq!(layer.config.min_samples, 100);
933 assert_eq!(layer.config.min_deadline, Duration::from_millis(200));
934 assert_eq!(layer.config.max_deadline, Duration::from_secs(20));
935 }
936
937 #[tokio::test]
938 async fn test_layer_clone_shares_stats() {
939 let layer = TailCutLayer::new();
940 let cloned = layer.clone();
941
942 assert!(Arc::ptr_eq(&layer.stats, &cloned.stats));
943 }
944}