opendal_core/layers/
tail_cut.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23use crate::raw::*;
24use crate::*;
25
26/// Builder for TailCutLayer.
27///
28/// Use this to configure the layer, then call `build()` to create a layer
29/// that can be cloned and shared across multiple operators.
30///
31/// # Examples
32///
33/// ```no_run
34/// use opendal_core::layers::TailCutLayer;
35/// use std::time::Duration;
36/// # use opendal_core::services;
37/// # use opendal_core::Operator;
38/// # use opendal_core::Result;
39///
40/// # fn main() -> Result<()> {
41/// let layer = TailCutLayer::builder()
42///     .percentile(95)
43///     .window(Duration::from_secs(60))
44///     .build();
45///
46/// let op = Operator::new(services::Memory::default())?
47///     .layer(layer)
48///     .finish();
49/// # Ok(())
50/// # }
51/// ```
52#[derive(Clone)]
53pub struct TailCutLayerBuilder {
54    percentile: u8,
55    safety_factor: f64,
56    window: Duration,
57    min_samples: usize,
58    min_deadline: Duration,
59    max_deadline: Duration,
60}
61
62impl Default for TailCutLayerBuilder {
63    fn default() -> Self {
64        Self {
65            percentile: 95,
66            safety_factor: 1.3,
67            window: Duration::from_secs(60),
68            min_samples: 200,
69            min_deadline: Duration::from_millis(500),
70            max_deadline: Duration::from_secs(30),
71        }
72    }
73}
74
75impl TailCutLayerBuilder {
76    /// Create a new builder with default settings.
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    /// Set the percentile threshold (e.g., 95 for P95, 99 for P99).
82    ///
83    /// Requests slower than this percentile × safety_factor will be cancelled.
84    ///
85    /// Default: 95
86    ///
87    /// # Panics
88    ///
89    /// Panics if percentile is not between 50 and 99.
90    pub fn percentile(mut self, percentile: u8) -> Self {
91        assert!(
92            (50..=99).contains(&percentile),
93            "percentile must be between 50 and 99"
94        );
95        self.percentile = percentile;
96        self
97    }
98
99    /// Set the safety factor multiplier.
100    ///
101    /// The actual deadline is calculated as: P{percentile} × safety_factor.
102    /// A higher value reduces false positives but may miss some long tails.
103    ///
104    /// Default: 1.3 (30% buffer)
105    ///
106    /// # Panics
107    ///
108    /// Panics if factor is not between 1.0 and 5.0.
109    pub fn safety_factor(mut self, factor: f64) -> Self {
110        assert!(
111            (1.0..=5.0).contains(&factor),
112            "safety_factor must be between 1.0 and 5.0"
113        );
114        self.safety_factor = factor;
115        self
116    }
117
118    /// Set the sliding window duration for statistics collection.
119    ///
120    /// Longer windows provide more stable statistics but react slower to changes.
121    /// Shorter windows adapt faster but may be more noisy.
122    ///
123    /// Default: 60 seconds
124    ///
125    /// # Panics
126    ///
127    /// Panics if window is greater than 120 seconds.
128    pub fn window(mut self, window: Duration) -> Self {
129        assert!(
130            window <= Duration::from_secs(120),
131            "window must be <= 120 seconds"
132        );
133        self.window = window;
134        self
135    }
136
137    /// Set the minimum number of samples required before enabling adaptive cancellation.
138    ///
139    /// During cold start (when sample count < min_samples), the layer will not
140    /// cancel any requests to avoid false positives.
141    ///
142    /// Default: 200
143    pub fn min_samples(mut self, min_samples: usize) -> Self {
144        self.min_samples = min_samples;
145        self
146    }
147
148    /// Set the minimum deadline (floor).
149    ///
150    /// Even if calculated deadline is shorter, it will be clamped to this value.
151    /// This prevents overly aggressive cancellation on very fast backends.
152    ///
153    /// Default: 500ms
154    pub fn min_deadline(mut self, deadline: Duration) -> Self {
155        self.min_deadline = deadline;
156        self
157    }
158
159    /// Set the maximum deadline (ceiling).
160    ///
161    /// Even if calculated deadline is longer, it will be clamped to this value.
162    /// This acts as a safety fallback timeout.
163    ///
164    /// Default: 30s
165    pub fn max_deadline(mut self, deadline: Duration) -> Self {
166        self.max_deadline = deadline;
167        self
168    }
169
170    /// Build the layer.
171    ///
172    /// The returned layer can be cloned to share statistics across operators.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// use opendal_core::layers::TailCutLayer;
178    /// use std::time::Duration;
179    /// # use opendal_core::services;
180    /// # use opendal_core::Operator;
181    /// # use opendal_core::Result;
182    ///
183    /// # fn main() -> Result<()> {
184    /// let layer = TailCutLayer::builder()
185    ///     .percentile(95)
186    ///     .window(Duration::from_secs(60))
187    ///     .build();
188    ///
189    /// // Share the layer across operators
190    /// let op1 = Operator::new(services::Memory::default())?
191    ///     .layer(layer.clone())
192    ///     .finish();
193    ///
194    /// let op2 = Operator::new(services::Memory::default())?
195    ///     .layer(layer.clone())
196    ///     .finish();
197    /// // op1 and op2 share the same statistics
198    /// # Ok(())
199    /// # }
200    /// ```
201    pub fn build(self) -> TailCutLayer {
202        TailCutLayer {
203            config: Arc::new(TailCutConfig {
204                percentile: self.percentile,
205                safety_factor: self.safety_factor,
206                window: self.window,
207                min_samples: self.min_samples,
208                min_deadline: self.min_deadline,
209                max_deadline: self.max_deadline,
210            }),
211            stats: Arc::new(TailCutStats::new()),
212        }
213    }
214}
215
216/// Configuration for TailCutLayer (immutable).
217#[derive(Debug)]
218struct TailCutConfig {
219    percentile: u8,
220    safety_factor: f64,
221    window: Duration,
222    min_samples: usize,
223    min_deadline: Duration,
224    max_deadline: Duration,
225}
226
227/// Layer that automatically cancels long-tail requests.
228///
229/// This layer monitors request latency distribution and cancels requests that are
230/// significantly slower than the historical baseline (e.g., slower than P95).
231///
232/// This layer should be created via [`TailCutLayer::builder()`] and can be
233/// cloned to share statistics across multiple operators.
234///
235/// # Examples
236///
237/// ```no_run
238/// use opendal_core::layers::TailCutLayer;
239/// use std::time::Duration;
240/// # use opendal_core::services;
241/// # use opendal_core::Operator;
242/// # use opendal_core::Result;
243///
244/// # fn main() -> Result<()> {
245/// let layer = TailCutLayer::builder()
246///     .percentile(95)
247///     .safety_factor(1.3)
248///     .window(Duration::from_secs(60))
249///     .build();
250///
251/// let op = Operator::new(services::Memory::default())?
252///     .layer(layer)
253///     .finish();
254/// # Ok(())
255/// # }
256/// ```
257#[derive(Clone)]
258pub struct TailCutLayer {
259    config: Arc<TailCutConfig>,
260    stats: Arc<TailCutStats>,
261}
262
263impl TailCutLayer {
264    /// Create a builder to configure the layer.
265    pub fn builder() -> TailCutLayerBuilder {
266        TailCutLayerBuilder::new()
267    }
268
269    /// Create a layer with default settings.
270    ///
271    /// This is equivalent to `TailCutLayer::builder().build()`.
272    pub fn new() -> Self {
273        Self::builder().build()
274    }
275}
276
277impl Default for TailCutLayer {
278    fn default() -> Self {
279        Self::new()
280    }
281}
282
283impl<A: Access> Layer<A> for TailCutLayer {
284    type LayeredAccess = TailCutAccessor<A>;
285
286    fn layer(&self, inner: A) -> Self::LayeredAccess {
287        TailCutAccessor {
288            inner,
289            config: self.config.clone(),
290            stats: self.stats.clone(),
291        }
292    }
293}
294
295/// Accessor that implements tail cut logic.
296#[derive(Clone)]
297pub struct TailCutAccessor<A: Access> {
298    inner: A,
299    config: Arc<TailCutConfig>,
300    stats: Arc<TailCutStats>,
301}
302
303impl<A: Access> Debug for TailCutAccessor<A> {
304    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
305        f.debug_struct("TailCutAccessor")
306            .field("config", &self.config)
307            .finish_non_exhaustive()
308    }
309}
310
311impl<A: Access> TailCutAccessor<A> {
312    /// Calculate the deadline for a given operation and size.
313    fn calculate_deadline(&self, op: Operation, size: Option<u64>) -> Option<Duration> {
314        let op_stats = self.stats.stats_for(op);
315
316        if op_stats.total_samples(size, self.config.window) < self.config.min_samples {
317            return None;
318        }
319
320        let q = self.config.percentile as f64 / 100.0;
321        let pctl = op_stats.quantile(size, q, self.config.window)?;
322
323        let deadline = Duration::from_secs_f64(pctl.as_secs_f64() * self.config.safety_factor);
324        Some(deadline.clamp(self.config.min_deadline, self.config.max_deadline))
325    }
326
327    async fn with_deadline<F, T>(&self, op: Operation, size: Option<u64>, fut: F) -> Result<T>
328    where
329        F: Future<Output = Result<T>>,
330    {
331        let start = Instant::now();
332
333        let result = if let Some(deadline) = self.calculate_deadline(op, size) {
334            match tokio::time::timeout(deadline, fut).await {
335                Ok(res) => res,
336                Err(_) => Err(Error::new(ErrorKind::Unexpected, "cancelled by tail cut")
337                    .with_operation(op)
338                    .with_context("percentile", format!("P{}", self.config.percentile))
339                    .with_context("deadline", format!("{:?}", deadline))
340                    .set_temporary()),
341            }
342        } else {
343            fut.await
344        };
345
346        if result.is_ok() {
347            let latency = start.elapsed();
348            self.stats.stats_for(op).record(size, latency);
349        }
350
351        result
352    }
353}
354
355impl<A: Access> LayeredAccess for TailCutAccessor<A> {
356    type Inner = A;
357    type Reader = TailCutWrapper<A::Reader>;
358    type Writer = TailCutWrapper<A::Writer>;
359    type Lister = TailCutWrapper<A::Lister>;
360    type Deleter = TailCutWrapper<A::Deleter>;
361
362    fn inner(&self) -> &Self::Inner {
363        &self.inner
364    }
365
366    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
367        self.with_deadline(
368            Operation::CreateDir,
369            None,
370            self.inner.create_dir(path, args),
371        )
372        .await
373    }
374
375    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
376        let size = args.range().size();
377        self.with_deadline(Operation::Read, size, self.inner.read(path, args))
378            .await
379            .map(|(rp, r)| {
380                (
381                    rp,
382                    TailCutWrapper::new(r, size, self.config.clone(), self.stats.clone()),
383                )
384            })
385    }
386
387    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
388        self.with_deadline(Operation::Write, None, self.inner.write(path, args))
389            .await
390            .map(|(rp, w)| {
391                (
392                    rp,
393                    TailCutWrapper::new(w, None, self.config.clone(), self.stats.clone()),
394                )
395            })
396    }
397
398    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
399        self.with_deadline(Operation::Copy, None, self.inner.copy(from, to, args))
400            .await
401    }
402
403    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
404        self.with_deadline(Operation::Rename, None, self.inner.rename(from, to, args))
405            .await
406    }
407
408    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
409        self.with_deadline(Operation::Stat, None, self.inner.stat(path, args))
410            .await
411    }
412
413    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
414        self.with_deadline(Operation::Delete, None, self.inner.delete())
415            .await
416            .map(|(rp, d)| {
417                (
418                    rp,
419                    TailCutWrapper::new(d, None, self.config.clone(), self.stats.clone()),
420                )
421            })
422    }
423
424    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
425        self.with_deadline(Operation::List, None, self.inner.list(path, args))
426            .await
427            .map(|(rp, l)| {
428                (
429                    rp,
430                    TailCutWrapper::new(l, None, self.config.clone(), self.stats.clone()),
431                )
432            })
433    }
434
435    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
436        self.with_deadline(Operation::Presign, None, self.inner.presign(path, args))
437            .await
438    }
439}
440
441/// Wrapper for IO operations (Reader, Writer, Lister, Deleter).
442pub struct TailCutWrapper<R> {
443    inner: R,
444    size: Option<u64>,
445    config: Arc<TailCutConfig>,
446    stats: Arc<TailCutStats>,
447}
448
449impl<R> TailCutWrapper<R> {
450    fn new(
451        inner: R,
452        size: Option<u64>,
453        config: Arc<TailCutConfig>,
454        stats: Arc<TailCutStats>,
455    ) -> Self {
456        Self {
457            inner,
458            size,
459            config,
460            stats,
461        }
462    }
463
464    fn calculate_deadline(&self, op: Operation) -> Option<Duration> {
465        let op_stats = self.stats.stats_for(op);
466
467        if op_stats.total_samples(self.size, self.config.window) < self.config.min_samples {
468            return None;
469        }
470
471        let q = self.config.percentile as f64 / 100.0;
472        let pctl = op_stats.quantile(self.size, q, self.config.window)?;
473
474        let deadline = Duration::from_secs_f64(pctl.as_secs_f64() * self.config.safety_factor);
475        Some(deadline.clamp(self.config.min_deadline, self.config.max_deadline))
476    }
477
478    #[inline]
479    async fn with_io_deadline<F, T>(
480        deadline: Option<Duration>,
481        percentile: u8,
482        stats: &Arc<TailCutStats>,
483        size: Option<u64>,
484        op: Operation,
485        fut: F,
486    ) -> Result<T>
487    where
488        F: std::future::Future<Output = Result<T>>,
489    {
490        let start = Instant::now();
491
492        let result = if let Some(dl) = deadline {
493            match tokio::time::timeout(dl, fut).await {
494                Ok(res) => res,
495                Err(_) => Err(
496                    Error::new(ErrorKind::Unexpected, "io cancelled by tail cut")
497                        .with_operation(op)
498                        .with_context("percentile", format!("P{}", percentile))
499                        .with_context("deadline", format!("{:?}", dl))
500                        .set_temporary(),
501                ),
502            }
503        } else {
504            fut.await
505        };
506
507        if result.is_ok() {
508            let latency = start.elapsed();
509            stats.stats_for(op).record(size, latency);
510        }
511
512        result
513    }
514}
515
516impl<R: oio::Read> oio::Read for TailCutWrapper<R> {
517    async fn read(&mut self) -> Result<Buffer> {
518        let deadline = self.calculate_deadline(Operation::Read);
519        Self::with_io_deadline(
520            deadline,
521            self.config.percentile,
522            &self.stats,
523            self.size,
524            Operation::Read,
525            self.inner.read(),
526        )
527        .await
528    }
529}
530
531impl<R: oio::Write> oio::Write for TailCutWrapper<R> {
532    async fn write(&mut self, bs: Buffer) -> Result<()> {
533        let deadline = self.calculate_deadline(Operation::Write);
534        Self::with_io_deadline(
535            deadline,
536            self.config.percentile,
537            &self.stats,
538            self.size,
539            Operation::Write,
540            self.inner.write(bs),
541        )
542        .await
543    }
544
545    async fn close(&mut self) -> Result<Metadata> {
546        let deadline = self.calculate_deadline(Operation::Write);
547        Self::with_io_deadline(
548            deadline,
549            self.config.percentile,
550            &self.stats,
551            self.size,
552            Operation::Write,
553            self.inner.close(),
554        )
555        .await
556    }
557
558    async fn abort(&mut self) -> Result<()> {
559        let deadline = self.calculate_deadline(Operation::Write);
560        Self::with_io_deadline(
561            deadline,
562            self.config.percentile,
563            &self.stats,
564            self.size,
565            Operation::Write,
566            self.inner.abort(),
567        )
568        .await
569    }
570}
571
572impl<R: oio::List> oio::List for TailCutWrapper<R> {
573    async fn next(&mut self) -> Result<Option<oio::Entry>> {
574        let deadline = self.calculate_deadline(Operation::List);
575        Self::with_io_deadline(
576            deadline,
577            self.config.percentile,
578            &self.stats,
579            self.size,
580            Operation::List,
581            self.inner.next(),
582        )
583        .await
584    }
585}
586
587impl<R: oio::Delete> oio::Delete for TailCutWrapper<R> {
588    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
589        self.inner.delete(path, args).await
590    }
591
592    async fn close(&mut self) -> Result<()> {
593        let deadline = self.calculate_deadline(Operation::Delete);
594        Self::with_io_deadline(
595            deadline,
596            self.config.percentile,
597            &self.stats,
598            self.size,
599            Operation::Delete,
600            self.inner.close(),
601        )
602        .await
603    }
604}
605
606/// Statistics engine for tail cut layer.
607struct TailCutStats {
608    // Statistics for each operation type (7 operations)
609    operations: [Arc<OperationStats>; 7],
610}
611
612impl TailCutStats {
613    fn new() -> Self {
614        Self {
615            operations: std::array::from_fn(|_| Arc::new(OperationStats::new())),
616        }
617    }
618
619    fn stats_for(&self, op: Operation) -> &Arc<OperationStats> {
620        let idx = match op {
621            Operation::Read => 0,
622            Operation::Write => 1,
623            Operation::Stat => 2,
624            Operation::List => 3,
625            Operation::Delete => 4,
626            Operation::Copy => 5,
627            Operation::Rename => 6,
628            _ => 2, // fallback to Stat
629        };
630        &self.operations[idx]
631    }
632}
633
634/// Statistics for a single operation type.
635struct OperationStats {
636    buckets: Vec<SizeBucket>,
637}
638
639impl OperationStats {
640    fn new() -> Self {
641        Self {
642            buckets: vec![
643                SizeBucket::new(0, Some(4 * 1024)),                   // [0, 4KB)
644                SizeBucket::new(4 * 1024, Some(64 * 1024)),           // [4KB, 64KB)
645                SizeBucket::new(64 * 1024, Some(1024 * 1024)),        // [64KB, 1MB)
646                SizeBucket::new(1024 * 1024, Some(16 * 1024 * 1024)), // [1MB, 16MB)
647                SizeBucket::new(16 * 1024 * 1024, Some(256 * 1024 * 1024)), // [16MB, 256MB)
648                SizeBucket::new(256 * 1024 * 1024, None),             // [256MB, ∞)
649            ],
650        }
651    }
652
653    fn bucket_for(&self, size: Option<u64>) -> &SizeBucket {
654        let size = size.unwrap_or(u64::MAX);
655
656        self.buckets
657            .iter()
658            .find(|b| b.contains(size))
659            .unwrap_or(&self.buckets[self.buckets.len() - 1])
660    }
661
662    fn record(&self, size: Option<u64>, latency: Duration) {
663        self.bucket_for(size).histogram.record(latency);
664    }
665
666    fn quantile(&self, size: Option<u64>, q: f64, window: Duration) -> Option<Duration> {
667        self.bucket_for(size).histogram.quantile(q, window)
668    }
669
670    fn total_samples(&self, size: Option<u64>, window: Duration) -> usize {
671        self.bucket_for(size).histogram.total_samples(window)
672    }
673}
674
675/// Size bucket for categorizing operations by data size.
676struct SizeBucket {
677    min_size: u64,
678    max_size: Option<u64>,
679    histogram: WindowedHistogram,
680}
681
682impl SizeBucket {
683    fn new(min_size: u64, max_size: Option<u64>) -> Self {
684        Self {
685            min_size,
686            max_size,
687            histogram: WindowedHistogram::new(),
688        }
689    }
690
691    fn contains(&self, size: u64) -> bool {
692        size >= self.min_size && self.max_size.is_none_or(|max| size < max)
693    }
694}
695
696const SLICE_DURATION_MS: u64 = 10_000; // 10 seconds per slice
697const NUM_SLICES: usize = 12; // 12 slices = 120 seconds total window
698const NUM_BUCKETS: usize = 17; // 17 buckets covering 1ms to 64s
699
700/// Windowed histogram using lock-free atomic operations.
701struct WindowedHistogram {
702    slices: Box<[TimeSlice; NUM_SLICES]>,
703    current_idx: AtomicUsize,
704    last_rotate: AtomicU64,
705}
706
707impl WindowedHistogram {
708    fn new() -> Self {
709        Self {
710            slices: Box::new(std::array::from_fn(|_| TimeSlice::new())),
711            current_idx: AtomicUsize::new(0),
712            last_rotate: AtomicU64::new(Self::now_ms()),
713        }
714    }
715
716    fn record(&self, latency: Duration) {
717        self.maybe_rotate();
718
719        let bucket_idx = Self::latency_to_bucket(latency);
720        let slice_idx = self.current_idx.load(Ordering::Relaxed);
721
722        self.slices[slice_idx].buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
723    }
724
725    fn quantile(&self, q: f64, window: Duration) -> Option<Duration> {
726        debug_assert!((0.0..=1.0).contains(&q), "quantile must be in [0, 1]");
727
728        let snapshot = self.snapshot(window);
729        let total: u64 = snapshot.iter().sum();
730
731        if total == 0 {
732            return None;
733        }
734
735        let target = (total as f64 * q).ceil() as u64;
736        let mut cumsum = 0u64;
737
738        for (bucket_idx, &count) in snapshot.iter().enumerate() {
739            cumsum += count;
740            if cumsum >= target {
741                return Some(Self::bucket_to_latency(bucket_idx));
742            }
743        }
744
745        Some(Self::bucket_to_latency(NUM_BUCKETS - 1))
746    }
747
748    fn total_samples(&self, window: Duration) -> usize {
749        self.snapshot(window).iter().map(|&v| v as usize).sum()
750    }
751
752    fn snapshot(&self, window: Duration) -> [u64; NUM_BUCKETS] {
753        let mut result = [0u64; NUM_BUCKETS];
754        let now_ms = Self::now_ms();
755        let window_ms = window.as_millis() as u64;
756
757        for slice in self.slices.iter() {
758            let start = slice.start_epoch_ms.load(Ordering::Acquire);
759
760            if start > 0 && now_ms.saturating_sub(start) < window_ms + SLICE_DURATION_MS {
761                for (i, bucket) in slice.buckets.iter().enumerate() {
762                    result[i] += bucket.load(Ordering::Relaxed);
763                }
764            }
765        }
766
767        result
768    }
769
770    fn maybe_rotate(&self) {
771        let now = Self::now_ms();
772        let last_rotate = self.last_rotate.load(Ordering::Relaxed);
773
774        if now - last_rotate >= SLICE_DURATION_MS
775            && self
776                .last_rotate
777                .compare_exchange(last_rotate, now, Ordering::Release, Ordering::Relaxed)
778                .is_ok()
779        {
780            let old_idx = self.current_idx.load(Ordering::Relaxed);
781            let new_idx = (old_idx + 1) % NUM_SLICES;
782
783            let new_slice = &self.slices[new_idx];
784            new_slice.start_epoch_ms.store(now, Ordering::Release);
785            for bucket in &new_slice.buckets {
786                bucket.store(0, Ordering::Relaxed);
787            }
788
789            self.current_idx.store(new_idx, Ordering::Release);
790        }
791    }
792
793    fn latency_to_bucket(latency: Duration) -> usize {
794        let ms = latency.as_millis() as u64;
795
796        if ms == 0 {
797            return 0;
798        }
799
800        let bucket = 64 - ms.leading_zeros();
801        (bucket as usize).min(NUM_BUCKETS - 1)
802    }
803
804    fn bucket_to_latency(bucket_idx: usize) -> Duration {
805        if bucket_idx == 0 {
806            Duration::from_millis(1)
807        } else if bucket_idx >= NUM_BUCKETS - 1 {
808            Duration::from_secs(64)
809        } else {
810            Duration::from_millis(1u64 << bucket_idx)
811        }
812    }
813
814    fn now_ms() -> u64 {
815        // SAFETY: Unless System clock goes backwards before UNIX_EPOCH, this should never fail.
816        u64::try_from(Timestamp::now().into_inner().as_millisecond()).unwrap()
817    }
818}
819
820/// Time slice in the sliding window.
821struct TimeSlice {
822    // 17 buckets covering 1ms to 64s (logarithmic scale)
823    buckets: [AtomicU64; NUM_BUCKETS],
824    start_epoch_ms: AtomicU64,
825}
826
827impl TimeSlice {
828    fn new() -> Self {
829        Self {
830            buckets: std::array::from_fn(|_| AtomicU64::new(0)),
831            start_epoch_ms: AtomicU64::new(0),
832        }
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839
840    #[test]
841    fn test_latency_to_bucket() {
842        assert_eq!(
843            WindowedHistogram::latency_to_bucket(Duration::from_millis(0)),
844            0
845        );
846        assert_eq!(
847            WindowedHistogram::latency_to_bucket(Duration::from_millis(1)),
848            1
849        );
850        assert_eq!(
851            WindowedHistogram::latency_to_bucket(Duration::from_millis(2)),
852            2
853        );
854        assert_eq!(
855            WindowedHistogram::latency_to_bucket(Duration::from_millis(4)),
856            3
857        );
858        assert_eq!(
859            WindowedHistogram::latency_to_bucket(Duration::from_millis(8)),
860            4
861        );
862        assert_eq!(
863            WindowedHistogram::latency_to_bucket(Duration::from_millis(500)),
864            9
865        );
866        assert_eq!(
867            WindowedHistogram::latency_to_bucket(Duration::from_secs(1)),
868            10
869        );
870        assert_eq!(
871            WindowedHistogram::latency_to_bucket(Duration::from_secs(2)),
872            11
873        );
874        assert_eq!(
875            WindowedHistogram::latency_to_bucket(Duration::from_secs(64)),
876            16
877        );
878        assert_eq!(
879            WindowedHistogram::latency_to_bucket(Duration::from_secs(1000)),
880            16
881        );
882    }
883
884    #[test]
885    fn test_size_bucket_contains() {
886        let bucket = SizeBucket::new(0, Some(4096));
887        assert!(bucket.contains(0));
888        assert!(bucket.contains(4095));
889        assert!(!bucket.contains(4096));
890
891        let bucket = SizeBucket::new(4096, None);
892        assert!(!bucket.contains(4095));
893        assert!(bucket.contains(4096));
894        assert!(bucket.contains(u64::MAX));
895    }
896
897    #[tokio::test]
898    async fn test_histogram_basic() {
899        let hist = WindowedHistogram::new();
900        let now = WindowedHistogram::now_ms();
901        hist.slices[0].start_epoch_ms.store(now, Ordering::Release);
902
903        hist.record(Duration::from_millis(10));
904        hist.record(Duration::from_millis(20));
905        hist.record(Duration::from_millis(30));
906
907        let samples = hist.total_samples(Duration::from_secs(60));
908        assert_eq!(samples, 3);
909
910        let p50 = hist.quantile(0.5, Duration::from_secs(60));
911        assert!(p50.is_some());
912    }
913
914    #[tokio::test]
915    async fn test_tail_cut_layer_build() {
916        let layer = TailCutLayer::builder()
917            .percentile(95)
918            .safety_factor(1.5)
919            .window(Duration::from_secs(60))
920            .min_samples(100)
921            .min_deadline(Duration::from_millis(200))
922            .max_deadline(Duration::from_secs(20))
923            .build();
924
925        assert_eq!(layer.config.percentile, 95);
926        assert_eq!(layer.config.safety_factor, 1.5);
927        assert_eq!(layer.config.window, Duration::from_secs(60));
928        assert_eq!(layer.config.min_samples, 100);
929        assert_eq!(layer.config.min_deadline, Duration::from_millis(200));
930        assert_eq!(layer.config.max_deadline, Duration::from_secs(20));
931    }
932
933    #[tokio::test]
934    async fn test_layer_clone_shares_stats() {
935        let layer = TailCutLayer::new();
936        let cloned = layer.clone();
937
938        assert!(Arc::ptr_eq(&layer.stats, &cloned.stats));
939    }
940}