1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23use crate::raw::*;
24use crate::*;
25
26#[derive(Clone)]
53pub struct TailCutLayerBuilder {
54 percentile: u8,
55 safety_factor: f64,
56 window: Duration,
57 min_samples: usize,
58 min_deadline: Duration,
59 max_deadline: Duration,
60}
61
62impl Default for TailCutLayerBuilder {
63 fn default() -> Self {
64 Self {
65 percentile: 95,
66 safety_factor: 1.3,
67 window: Duration::from_secs(60),
68 min_samples: 200,
69 min_deadline: Duration::from_millis(500),
70 max_deadline: Duration::from_secs(30),
71 }
72 }
73}
74
75impl TailCutLayerBuilder {
76 pub fn new() -> Self {
78 Self::default()
79 }
80
81 pub fn percentile(mut self, percentile: u8) -> Self {
91 assert!(
92 (50..=99).contains(&percentile),
93 "percentile must be between 50 and 99"
94 );
95 self.percentile = percentile;
96 self
97 }
98
99 pub fn safety_factor(mut self, factor: f64) -> Self {
110 assert!(
111 (1.0..=5.0).contains(&factor),
112 "safety_factor must be between 1.0 and 5.0"
113 );
114 self.safety_factor = factor;
115 self
116 }
117
118 pub fn window(mut self, window: Duration) -> Self {
129 assert!(
130 window <= Duration::from_secs(120),
131 "window must be <= 120 seconds"
132 );
133 self.window = window;
134 self
135 }
136
137 pub fn min_samples(mut self, min_samples: usize) -> Self {
144 self.min_samples = min_samples;
145 self
146 }
147
148 pub fn min_deadline(mut self, deadline: Duration) -> Self {
155 self.min_deadline = deadline;
156 self
157 }
158
159 pub fn max_deadline(mut self, deadline: Duration) -> Self {
166 self.max_deadline = deadline;
167 self
168 }
169
170 pub fn build(self) -> TailCutLayer {
202 TailCutLayer {
203 config: Arc::new(TailCutConfig {
204 percentile: self.percentile,
205 safety_factor: self.safety_factor,
206 window: self.window,
207 min_samples: self.min_samples,
208 min_deadline: self.min_deadline,
209 max_deadline: self.max_deadline,
210 }),
211 stats: Arc::new(TailCutStats::new()),
212 }
213 }
214}
215
216#[derive(Debug)]
218struct TailCutConfig {
219 percentile: u8,
220 safety_factor: f64,
221 window: Duration,
222 min_samples: usize,
223 min_deadline: Duration,
224 max_deadline: Duration,
225}
226
227#[derive(Clone)]
258pub struct TailCutLayer {
259 config: Arc<TailCutConfig>,
260 stats: Arc<TailCutStats>,
261}
262
263impl TailCutLayer {
264 pub fn builder() -> TailCutLayerBuilder {
266 TailCutLayerBuilder::new()
267 }
268
269 pub fn new() -> Self {
273 Self::builder().build()
274 }
275}
276
277impl Default for TailCutLayer {
278 fn default() -> Self {
279 Self::new()
280 }
281}
282
283impl<A: Access> Layer<A> for TailCutLayer {
284 type LayeredAccess = TailCutAccessor<A>;
285
286 fn layer(&self, inner: A) -> Self::LayeredAccess {
287 TailCutAccessor {
288 inner,
289 config: self.config.clone(),
290 stats: self.stats.clone(),
291 }
292 }
293}
294
295#[derive(Clone)]
297pub struct TailCutAccessor<A: Access> {
298 inner: A,
299 config: Arc<TailCutConfig>,
300 stats: Arc<TailCutStats>,
301}
302
303impl<A: Access> Debug for TailCutAccessor<A> {
304 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
305 f.debug_struct("TailCutAccessor")
306 .field("config", &self.config)
307 .finish_non_exhaustive()
308 }
309}
310
311impl<A: Access> TailCutAccessor<A> {
312 fn calculate_deadline(&self, op: Operation, size: Option<u64>) -> Option<Duration> {
314 let op_stats = self.stats.stats_for(op);
315
316 if op_stats.total_samples(size, self.config.window) < self.config.min_samples {
317 return None;
318 }
319
320 let q = self.config.percentile as f64 / 100.0;
321 let pctl = op_stats.quantile(size, q, self.config.window)?;
322
323 let deadline = Duration::from_secs_f64(pctl.as_secs_f64() * self.config.safety_factor);
324 Some(deadline.clamp(self.config.min_deadline, self.config.max_deadline))
325 }
326
327 async fn with_deadline<F, T>(&self, op: Operation, size: Option<u64>, fut: F) -> Result<T>
328 where
329 F: Future<Output = Result<T>>,
330 {
331 let start = Instant::now();
332
333 let result = if let Some(deadline) = self.calculate_deadline(op, size) {
334 match tokio::time::timeout(deadline, fut).await {
335 Ok(res) => res,
336 Err(_) => Err(Error::new(ErrorKind::Unexpected, "cancelled by tail cut")
337 .with_operation(op)
338 .with_context("percentile", format!("P{}", self.config.percentile))
339 .with_context("deadline", format!("{:?}", deadline))
340 .set_temporary()),
341 }
342 } else {
343 fut.await
344 };
345
346 if result.is_ok() {
347 let latency = start.elapsed();
348 self.stats.stats_for(op).record(size, latency);
349 }
350
351 result
352 }
353}
354
355impl<A: Access> LayeredAccess for TailCutAccessor<A> {
356 type Inner = A;
357 type Reader = TailCutWrapper<A::Reader>;
358 type Writer = TailCutWrapper<A::Writer>;
359 type Lister = TailCutWrapper<A::Lister>;
360 type Deleter = TailCutWrapper<A::Deleter>;
361
362 fn inner(&self) -> &Self::Inner {
363 &self.inner
364 }
365
366 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
367 self.with_deadline(
368 Operation::CreateDir,
369 None,
370 self.inner.create_dir(path, args),
371 )
372 .await
373 }
374
375 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
376 let size = args.range().size();
377 self.with_deadline(Operation::Read, size, self.inner.read(path, args))
378 .await
379 .map(|(rp, r)| {
380 (
381 rp,
382 TailCutWrapper::new(r, size, self.config.clone(), self.stats.clone()),
383 )
384 })
385 }
386
387 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
388 self.with_deadline(Operation::Write, None, self.inner.write(path, args))
389 .await
390 .map(|(rp, w)| {
391 (
392 rp,
393 TailCutWrapper::new(w, None, self.config.clone(), self.stats.clone()),
394 )
395 })
396 }
397
398 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
399 self.with_deadline(Operation::Copy, None, self.inner.copy(from, to, args))
400 .await
401 }
402
403 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
404 self.with_deadline(Operation::Rename, None, self.inner.rename(from, to, args))
405 .await
406 }
407
408 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
409 self.with_deadline(Operation::Stat, None, self.inner.stat(path, args))
410 .await
411 }
412
413 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
414 self.with_deadline(Operation::Delete, None, self.inner.delete())
415 .await
416 .map(|(rp, d)| {
417 (
418 rp,
419 TailCutWrapper::new(d, None, self.config.clone(), self.stats.clone()),
420 )
421 })
422 }
423
424 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
425 self.with_deadline(Operation::List, None, self.inner.list(path, args))
426 .await
427 .map(|(rp, l)| {
428 (
429 rp,
430 TailCutWrapper::new(l, None, self.config.clone(), self.stats.clone()),
431 )
432 })
433 }
434
435 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
436 self.with_deadline(Operation::Presign, None, self.inner.presign(path, args))
437 .await
438 }
439}
440
441pub struct TailCutWrapper<R> {
443 inner: R,
444 size: Option<u64>,
445 config: Arc<TailCutConfig>,
446 stats: Arc<TailCutStats>,
447}
448
449impl<R> TailCutWrapper<R> {
450 fn new(
451 inner: R,
452 size: Option<u64>,
453 config: Arc<TailCutConfig>,
454 stats: Arc<TailCutStats>,
455 ) -> Self {
456 Self {
457 inner,
458 size,
459 config,
460 stats,
461 }
462 }
463
464 fn calculate_deadline(&self, op: Operation) -> Option<Duration> {
465 let op_stats = self.stats.stats_for(op);
466
467 if op_stats.total_samples(self.size, self.config.window) < self.config.min_samples {
468 return None;
469 }
470
471 let q = self.config.percentile as f64 / 100.0;
472 let pctl = op_stats.quantile(self.size, q, self.config.window)?;
473
474 let deadline = Duration::from_secs_f64(pctl.as_secs_f64() * self.config.safety_factor);
475 Some(deadline.clamp(self.config.min_deadline, self.config.max_deadline))
476 }
477
478 #[inline]
479 async fn with_io_deadline<F, T>(
480 deadline: Option<Duration>,
481 percentile: u8,
482 stats: &Arc<TailCutStats>,
483 size: Option<u64>,
484 op: Operation,
485 fut: F,
486 ) -> Result<T>
487 where
488 F: std::future::Future<Output = Result<T>>,
489 {
490 let start = Instant::now();
491
492 let result = if let Some(dl) = deadline {
493 match tokio::time::timeout(dl, fut).await {
494 Ok(res) => res,
495 Err(_) => Err(
496 Error::new(ErrorKind::Unexpected, "io cancelled by tail cut")
497 .with_operation(op)
498 .with_context("percentile", format!("P{}", percentile))
499 .with_context("deadline", format!("{:?}", dl))
500 .set_temporary(),
501 ),
502 }
503 } else {
504 fut.await
505 };
506
507 if result.is_ok() {
508 let latency = start.elapsed();
509 stats.stats_for(op).record(size, latency);
510 }
511
512 result
513 }
514}
515
516impl<R: oio::Read> oio::Read for TailCutWrapper<R> {
517 async fn read(&mut self) -> Result<Buffer> {
518 let deadline = self.calculate_deadline(Operation::Read);
519 Self::with_io_deadline(
520 deadline,
521 self.config.percentile,
522 &self.stats,
523 self.size,
524 Operation::Read,
525 self.inner.read(),
526 )
527 .await
528 }
529}
530
531impl<R: oio::Write> oio::Write for TailCutWrapper<R> {
532 async fn write(&mut self, bs: Buffer) -> Result<()> {
533 let deadline = self.calculate_deadline(Operation::Write);
534 Self::with_io_deadline(
535 deadline,
536 self.config.percentile,
537 &self.stats,
538 self.size,
539 Operation::Write,
540 self.inner.write(bs),
541 )
542 .await
543 }
544
545 async fn close(&mut self) -> Result<Metadata> {
546 let deadline = self.calculate_deadline(Operation::Write);
547 Self::with_io_deadline(
548 deadline,
549 self.config.percentile,
550 &self.stats,
551 self.size,
552 Operation::Write,
553 self.inner.close(),
554 )
555 .await
556 }
557
558 async fn abort(&mut self) -> Result<()> {
559 let deadline = self.calculate_deadline(Operation::Write);
560 Self::with_io_deadline(
561 deadline,
562 self.config.percentile,
563 &self.stats,
564 self.size,
565 Operation::Write,
566 self.inner.abort(),
567 )
568 .await
569 }
570}
571
572impl<R: oio::List> oio::List for TailCutWrapper<R> {
573 async fn next(&mut self) -> Result<Option<oio::Entry>> {
574 let deadline = self.calculate_deadline(Operation::List);
575 Self::with_io_deadline(
576 deadline,
577 self.config.percentile,
578 &self.stats,
579 self.size,
580 Operation::List,
581 self.inner.next(),
582 )
583 .await
584 }
585}
586
587impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
588 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
589 self.inner.delete(path, args).await
590 }
591
592 async fn close(&mut self) -> Result<()> {
593 let deadline = self.calculate_deadline(Operation::Delete);
594 Self::with_io_deadline(
595 deadline,
596 self.config.percentile,
597 &self.stats,
598 self.size,
599 Operation::Delete,
600 self.inner.close(),
601 )
602 .await
603 }
604}
605
606struct TailCutStats {
608 operations: [Arc<OperationStats>; 7],
610}
611
612impl TailCutStats {
613 fn new() -> Self {
614 Self {
615 operations: std::array::from_fn(|_| Arc::new(OperationStats::new())),
616 }
617 }
618
619 fn stats_for(&self, op: Operation) -> &Arc<OperationStats> {
620 let idx = match op {
621 Operation::Read => 0,
622 Operation::Write => 1,
623 Operation::Stat => 2,
624 Operation::List => 3,
625 Operation::Delete => 4,
626 Operation::Copy => 5,
627 Operation::Rename => 6,
628 _ => 2, };
630 &self.operations[idx]
631 }
632}
633
634struct OperationStats {
636 buckets: Vec<SizeBucket>,
637}
638
639impl OperationStats {
640 fn new() -> Self {
641 Self {
642 buckets: vec![
643 SizeBucket::new(0, Some(4 * 1024)), SizeBucket::new(4 * 1024, Some(64 * 1024)), SizeBucket::new(64 * 1024, Some(1024 * 1024)), SizeBucket::new(1024 * 1024, Some(16 * 1024 * 1024)), SizeBucket::new(16 * 1024 * 1024, Some(256 * 1024 * 1024)), SizeBucket::new(256 * 1024 * 1024, None), ],
650 }
651 }
652
653 fn bucket_for(&self, size: Option<u64>) -> &SizeBucket {
654 let size = size.unwrap_or(u64::MAX);
655
656 self.buckets
657 .iter()
658 .find(|b| b.contains(size))
659 .unwrap_or(&self.buckets[self.buckets.len() - 1])
660 }
661
662 fn record(&self, size: Option<u64>, latency: Duration) {
663 self.bucket_for(size).histogram.record(latency);
664 }
665
666 fn quantile(&self, size: Option<u64>, q: f64, window: Duration) -> Option<Duration> {
667 self.bucket_for(size).histogram.quantile(q, window)
668 }
669
670 fn total_samples(&self, size: Option<u64>, window: Duration) -> usize {
671 self.bucket_for(size).histogram.total_samples(window)
672 }
673}
674
675struct SizeBucket {
677 min_size: u64,
678 max_size: Option<u64>,
679 histogram: WindowedHistogram,
680}
681
682impl SizeBucket {
683 fn new(min_size: u64, max_size: Option<u64>) -> Self {
684 Self {
685 min_size,
686 max_size,
687 histogram: WindowedHistogram::new(),
688 }
689 }
690
691 fn contains(&self, size: u64) -> bool {
692 size >= self.min_size && self.max_size.is_none_or(|max| size < max)
693 }
694}
695
696const SLICE_DURATION_MS: u64 = 10_000; const NUM_SLICES: usize = 12; const NUM_BUCKETS: usize = 17; struct WindowedHistogram {
702 slices: Box<[TimeSlice; NUM_SLICES]>,
703 current_idx: AtomicUsize,
704 last_rotate: AtomicU64,
705}
706
707impl WindowedHistogram {
708 fn new() -> Self {
709 Self {
710 slices: Box::new(std::array::from_fn(|_| TimeSlice::new())),
711 current_idx: AtomicUsize::new(0),
712 last_rotate: AtomicU64::new(Self::now_ms()),
713 }
714 }
715
716 fn record(&self, latency: Duration) {
717 self.maybe_rotate();
718
719 let bucket_idx = Self::latency_to_bucket(latency);
720 let slice_idx = self.current_idx.load(Ordering::Relaxed);
721
722 self.slices[slice_idx].buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
723 }
724
725 fn quantile(&self, q: f64, window: Duration) -> Option<Duration> {
726 debug_assert!((0.0..=1.0).contains(&q), "quantile must be in [0, 1]");
727
728 let snapshot = self.snapshot(window);
729 let total: u64 = snapshot.iter().sum();
730
731 if total == 0 {
732 return None;
733 }
734
735 let target = (total as f64 * q).ceil() as u64;
736 let mut cumsum = 0u64;
737
738 for (bucket_idx, &count) in snapshot.iter().enumerate() {
739 cumsum += count;
740 if cumsum >= target {
741 return Some(Self::bucket_to_latency(bucket_idx));
742 }
743 }
744
745 Some(Self::bucket_to_latency(NUM_BUCKETS - 1))
746 }
747
748 fn total_samples(&self, window: Duration) -> usize {
749 self.snapshot(window).iter().map(|&v| v as usize).sum()
750 }
751
752 fn snapshot(&self, window: Duration) -> [u64; NUM_BUCKETS] {
753 let mut result = [0u64; NUM_BUCKETS];
754 let now_ms = Self::now_ms();
755 let window_ms = window.as_millis() as u64;
756
757 for slice in self.slices.iter() {
758 let start = slice.start_epoch_ms.load(Ordering::Acquire);
759
760 if start > 0 && now_ms.saturating_sub(start) < window_ms + SLICE_DURATION_MS {
761 for (i, bucket) in slice.buckets.iter().enumerate() {
762 result[i] += bucket.load(Ordering::Relaxed);
763 }
764 }
765 }
766
767 result
768 }
769
770 fn maybe_rotate(&self) {
771 let now = Self::now_ms();
772 let last_rotate = self.last_rotate.load(Ordering::Relaxed);
773
774 if now - last_rotate >= SLICE_DURATION_MS
775 && self
776 .last_rotate
777 .compare_exchange(last_rotate, now, Ordering::Release, Ordering::Relaxed)
778 .is_ok()
779 {
780 let old_idx = self.current_idx.load(Ordering::Relaxed);
781 let new_idx = (old_idx + 1) % NUM_SLICES;
782
783 let new_slice = &self.slices[new_idx];
784 new_slice.start_epoch_ms.store(now, Ordering::Release);
785 for bucket in &new_slice.buckets {
786 bucket.store(0, Ordering::Relaxed);
787 }
788
789 self.current_idx.store(new_idx, Ordering::Release);
790 }
791 }
792
793 fn latency_to_bucket(latency: Duration) -> usize {
794 let ms = latency.as_millis() as u64;
795
796 if ms == 0 {
797 return 0;
798 }
799
800 let bucket = 64 - ms.leading_zeros();
801 (bucket as usize).min(NUM_BUCKETS - 1)
802 }
803
804 fn bucket_to_latency(bucket_idx: usize) -> Duration {
805 if bucket_idx == 0 {
806 Duration::from_millis(1)
807 } else if bucket_idx >= NUM_BUCKETS - 1 {
808 Duration::from_secs(64)
809 } else {
810 Duration::from_millis(1u64 << bucket_idx)
811 }
812 }
813
814 fn now_ms() -> u64 {
815 u64::try_from(Timestamp::now().into_inner().as_millisecond()).unwrap()
817 }
818}
819
820struct TimeSlice {
822 buckets: [AtomicU64; NUM_BUCKETS],
824 start_epoch_ms: AtomicU64,
825}
826
827impl TimeSlice {
828 fn new() -> Self {
829 Self {
830 buckets: std::array::from_fn(|_| AtomicU64::new(0)),
831 start_epoch_ms: AtomicU64::new(0),
832 }
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use super::*;
839
840 #[test]
841 fn test_latency_to_bucket() {
842 assert_eq!(
843 WindowedHistogram::latency_to_bucket(Duration::from_millis(0)),
844 0
845 );
846 assert_eq!(
847 WindowedHistogram::latency_to_bucket(Duration::from_millis(1)),
848 1
849 );
850 assert_eq!(
851 WindowedHistogram::latency_to_bucket(Duration::from_millis(2)),
852 2
853 );
854 assert_eq!(
855 WindowedHistogram::latency_to_bucket(Duration::from_millis(4)),
856 3
857 );
858 assert_eq!(
859 WindowedHistogram::latency_to_bucket(Duration::from_millis(8)),
860 4
861 );
862 assert_eq!(
863 WindowedHistogram::latency_to_bucket(Duration::from_millis(500)),
864 9
865 );
866 assert_eq!(
867 WindowedHistogram::latency_to_bucket(Duration::from_secs(1)),
868 10
869 );
870 assert_eq!(
871 WindowedHistogram::latency_to_bucket(Duration::from_secs(2)),
872 11
873 );
874 assert_eq!(
875 WindowedHistogram::latency_to_bucket(Duration::from_secs(64)),
876 16
877 );
878 assert_eq!(
879 WindowedHistogram::latency_to_bucket(Duration::from_secs(1000)),
880 16
881 );
882 }
883
884 #[test]
885 fn test_size_bucket_contains() {
886 let bucket = SizeBucket::new(0, Some(4096));
887 assert!(bucket.contains(0));
888 assert!(bucket.contains(4095));
889 assert!(!bucket.contains(4096));
890
891 let bucket = SizeBucket::new(4096, None);
892 assert!(!bucket.contains(4095));
893 assert!(bucket.contains(4096));
894 assert!(bucket.contains(u64::MAX));
895 }
896
897 #[tokio::test]
898 async fn test_histogram_basic() {
899 let hist = WindowedHistogram::new();
900 let now = WindowedHistogram::now_ms();
901 hist.slices[0].start_epoch_ms.store(now, Ordering::Release);
902
903 hist.record(Duration::from_millis(10));
904 hist.record(Duration::from_millis(20));
905 hist.record(Duration::from_millis(30));
906
907 let samples = hist.total_samples(Duration::from_secs(60));
908 assert_eq!(samples, 3);
909
910 let p50 = hist.quantile(0.5, Duration::from_secs(60));
911 assert!(p50.is_some());
912 }
913
914 #[tokio::test]
915 async fn test_tail_cut_layer_build() {
916 let layer = TailCutLayer::builder()
917 .percentile(95)
918 .safety_factor(1.5)
919 .window(Duration::from_secs(60))
920 .min_samples(100)
921 .min_deadline(Duration::from_millis(200))
922 .max_deadline(Duration::from_secs(20))
923 .build();
924
925 assert_eq!(layer.config.percentile, 95);
926 assert_eq!(layer.config.safety_factor, 1.5);
927 assert_eq!(layer.config.window, Duration::from_secs(60));
928 assert_eq!(layer.config.min_samples, 100);
929 assert_eq!(layer.config.min_deadline, Duration::from_millis(200));
930 assert_eq!(layer.config.max_deadline, Duration::from_secs(20));
931 }
932
933 #[tokio::test]
934 async fn test_layer_clone_shares_stats() {
935 let layer = TailCutLayer::new();
936 let cloned = layer.clone();
937
938 assert!(Arc::ptr_eq(&layer.stats, &cloned.stats));
939 }
940}