opendal/layers/observe/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::ready;
23use std::task::Context;
24use std::task::Poll;
25use std::time::Duration;
26use std::time::Instant;
27
28use futures::Stream;
29use futures::StreamExt;
30use http::StatusCode;
31
32use crate::raw::*;
33use crate::*;
34
35const KIB: f64 = 1024.0;
36const MIB: f64 = 1024.0 * KIB;
37const GIB: f64 = 1024.0 * MIB;
38
39/// Buckets for data size metrics like OperationBytes
40/// Covers typical file and object sizes from small files to large objects
41pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
42    4.0 * KIB,   // Small files
43    64.0 * KIB,  // File system block size
44    256.0 * KIB, //
45    1.0 * MIB,   // Common size threshold for many systems
46    4.0 * MIB,   // Best size for most http based systems
47    16.0 * MIB,  //
48    64.0 * MIB,  // Widely used threshold for multipart uploads
49    256.0 * MIB, //
50    1.0 * GIB,   // Considered large for many systems
51    5.0 * GIB,   // Maximum size in single upload for many cloud storage services
52];
53
54/// Buckets for data transfer rate metrics like OperationBytesRate
55///
56/// Covers various network speeds from slow connections to high-speed transfers
57///
58/// Note: this is for single operation rate, not for total bandwidth.
59pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
60    // Low-speed network range (mobile/weak connections)
61    8.0 * KIB,   // ~64Kbps - 2G networks
62    32.0 * KIB,  // ~256Kbps - 3G networks
63    128.0 * KIB, // ~1Mbps - Basic broadband
64    // Standard broadband range
65    1.0 * MIB,  // ~8Mbps - Entry-level broadband
66    8.0 * MIB,  // ~64Mbps - Fast broadband
67    32.0 * MIB, // ~256Mbps - Gigabit broadband
68    // High-performance network range
69    128.0 * MIB, // ~1Gbps - Standard datacenter
70    512.0 * MIB, // ~4Gbps - Fast datacenter
71    2.0 * GIB,   // ~16Gbps - High-end interconnects
72    // Ultra-high-speed range
73    8.0 * GIB,  // ~64Gbps - InfiniBand/RDMA
74    32.0 * GIB, // ~256Gbps - Top-tier datacenters
75];
76
77/// Buckets for batch operation entry counts (OperationEntriesCount)
78/// Covers scenarios from single entry operations to large batch operations
79pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
80    1.0,     // Single item operations
81    5.0,     // Very small batches
82    10.0,    // Small batches
83    50.0,    // Medium batches
84    100.0,   // Standard batch size
85    500.0,   // Large batches
86    1000.0,  // Very large batches, API limits for some services
87    5000.0,  // Huge batches, multi-page operations
88    10000.0, // Extremely large operations, multi-request batches
89];
90
91/// Buckets for batch operation processing rates (OperationEntriesRate)
92/// Measures how many entries can be processed per second
93pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
94    1.0,     // Slowest processing, heavy operations per entry
95    10.0,    // Slow processing, complex operations
96    50.0,    // Moderate processing speed
97    100.0,   // Good processing speed, efficient operations
98    500.0,   // Fast processing, optimized operations
99    1000.0,  // Very fast processing, simple operations
100    5000.0,  // Extremely fast processing, bulk operations
101    10000.0, // Maximum speed, listing operations, local systems
102];
103
104/// Buckets for operation duration metrics like OperationDurationSeconds
105/// Covers timeframes from fast metadata operations to long-running transfers
106pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
107    0.001, // 1ms - Fastest operations, cached responses
108    0.01,  // 10ms - Fast metadata operations, local operations
109    0.05,  // 50ms - Quick operations, nearby cloud resources
110    0.1,   // 100ms - Standard API response times, typical cloud latency
111    0.25,  // 250ms - Medium operations, small data transfers
112    0.5,   // 500ms - Medium-long operations, larger metadata operations
113    1.0,   // 1s - Long operations, small file transfers
114    2.5,   // 2.5s - Extended operations, medium file transfers
115    5.0,   // 5s - Long-running operations, large transfers
116    10.0,  // 10s - Very long operations, very large transfers
117    30.0,  // 30s - Extended operations, complex batch processes
118    60.0,  // 1min - Near timeout operations, extremely large transfers
119];
120
121/// Buckets for time to first byte metrics like OperationTtfbSeconds
122/// Focuses on initial response times, which are typically shorter than full operations
123pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
124    0.001, // 1ms - Cached or local resources
125    0.01,  // 10ms - Very fast responses, same region
126    0.025, // 25ms - Fast responses, optimized configurations
127    0.05,  // 50ms - Good response times, standard configurations
128    0.1,   // 100ms - Average response times for cloud storage
129    0.2,   // 200ms - Slower responses, cross-region or throttled
130    0.4,   // 400ms - Poor response times, network congestion
131    0.8,   // 800ms - Very slow responses, potential issues
132    1.6,   // 1.6s - Problematic responses, retry territory
133    3.2,   // 3.2s - Critical latency issues, close to timeouts
134];
135
136/// The metric label for the scheme like s3, fs, cos.
137pub static LABEL_SCHEME: &str = "scheme";
138/// The metric label for the namespace like bucket name in s3.
139pub static LABEL_NAMESPACE: &str = "namespace";
140/// The metric label for the root path.
141pub static LABEL_ROOT: &str = "root";
142/// The metric label for the operation like read, write, list.
143pub static LABEL_OPERATION: &str = "operation";
144/// The metric label for the error.
145pub static LABEL_ERROR: &str = "error";
146/// The metric label for the http code.
147pub static LABEL_STATUS_CODE: &str = "status_code";
148
149/// MetricLabels are the labels for the metrics.
150#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
151pub struct MetricLabels {
152    /// The storage scheme identifier (e.g., "s3", "gcs", "azblob", "fs").
153    /// Used to differentiate between different storage backends.
154    pub scheme: Scheme,
155    /// The storage namespace (e.g., bucket name, container name).
156    /// Identifies the specific storage container being accessed.
157    pub namespace: Arc<str>,
158    /// The root path within the namespace that was configured.
159    /// Used to track operations within a specific path prefix.
160    pub root: Arc<str>,
161    /// The operation being performed (e.g., "read", "write", "list").
162    /// Identifies which API operation generated this metric.
163    pub operation: &'static str,
164    /// The specific error kind that occurred during an operation.
165    /// Only populated for `OperationErrorsTotal` metric.
166    /// Used to track frequency of specific error types.
167    pub error: Option<ErrorKind>,
168    /// The HTTP status code received in an error response.
169    /// Only populated for `HttpStatusErrorsTotal` metric.
170    /// Used to track frequency of specific HTTP error status codes.
171    pub status_code: Option<StatusCode>,
172}
173
174impl MetricLabels {
175    /// Create a new set of MetricLabels.
176    fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
177        MetricLabels {
178            scheme: info.scheme(),
179            namespace: info.name(),
180            root: info.root(),
181            operation: op,
182            ..MetricLabels::default()
183        }
184    }
185
186    /// Add error to the metric labels.
187    fn with_error(mut self, err: ErrorKind) -> Self {
188        self.error = Some(err);
189        self
190    }
191
192    /// Add status code to the metric labels.
193    fn with_status_code(mut self, code: StatusCode) -> Self {
194        self.status_code = Some(code);
195        self
196    }
197}
198
199/// MetricValue is the value the opendal sends to the metrics impls.
200///
201/// Metrics impls can be `prometheus_client`, `metrics` etc.
202///
203/// Every metrics impls SHOULD implement observe over the MetricValue to make
204/// sure they provide the consistent metrics for users.
205#[non_exhaustive]
206#[derive(Debug, Clone, Copy)]
207pub enum MetricValue {
208    /// Record the size of data processed in bytes.
209    /// Metrics impl: Update a Histogram with the given byte count.
210    OperationBytes(u64),
211    /// Record the rate of data processing in bytes/second.
212    /// Metrics impl: Update a Histogram with the calculated rate value.
213    OperationBytesRate(f64),
214    /// Record the number of entries (files, objects, keys) processed.
215    /// Metrics impl: Update a Histogram with the entry count.
216    OperationEntries(u64),
217    /// Record the rate of entries processing in entries/second.
218    /// Metrics impl: Update a Histogram with the calculated rate value.
219    OperationEntriesRate(f64),
220    /// Record the total duration of an operation.
221    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
222    OperationDurationSeconds(Duration),
223    /// Increment the counter for operation errors.
224    /// Metrics impl: Increment a Counter by 1.
225    OperationErrorsTotal,
226    /// Update the current number of executing operations.
227    /// Metrics impl: Add the value (positive or negative) to a Gauge.
228    OperationExecuting(isize),
229    /// Record the time to first byte duration.
230    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
231    OperationTtfbSeconds(Duration),
232    /// Update the current number of executing HTTP requests.
233    /// Metrics impl: Add the value (positive or negative) to a Gauge.
234    HttpExecuting(isize),
235    /// Record the size of HTTP request body in bytes.
236    /// Metrics impl: Update a Histogram with the given byte count.
237    HttpRequestBytes(u64),
238    /// Record the rate of HTTP request data in bytes/second.
239    /// Metrics impl: Update a Histogram with the calculated rate value.
240    HttpRequestBytesRate(f64),
241    /// Record the duration of sending an HTTP request (until first byte received).
242    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
243    HttpRequestDurationSeconds(Duration),
244    /// Record the size of HTTP response body in bytes.
245    /// Metrics impl: Update a Histogram with the given byte count.
246    HttpResponseBytes(u64),
247    /// Record the rate of HTTP response data in bytes/second.
248    /// Metrics impl: Update a Histogram with the calculated rate value.
249    HttpResponseBytesRate(f64),
250    /// Record the duration of receiving an HTTP response (from first byte to last).
251    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
252    HttpResponseDurationSeconds(Duration),
253    /// Increment the counter for HTTP connection errors.
254    /// Metrics impl: Increment a Counter by 1.
255    HttpConnectionErrorsTotal,
256    /// Increment the counter for HTTP status errors (non-2xx responses).
257    /// Metrics impl: Increment a Counter by 1.
258    HttpStatusErrorsTotal,
259}
260
261impl MetricValue {
262    /// Returns the full metric name for this metric value.
263    pub fn name(&self) -> &'static str {
264        match self {
265            MetricValue::OperationBytes(_) => "opendal_operation_bytes",
266            MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
267            MetricValue::OperationEntries(_) => "opendal_operation_entries",
268            MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
269            MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
270            MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
271            MetricValue::OperationExecuting(_) => "opendal_operation_executing",
272            MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
273
274            MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
275            MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
276            MetricValue::HttpExecuting(_) => "opendal_http_executing",
277            MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
278            MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
279            MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
280            MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
281            MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
282            MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
283        }
284    }
285
286    /// Returns the metric name along with unit for this metric value.
287    ///
288    /// # Notes
289    ///
290    /// This API is designed for the metrics impls that unit aware. They will handle the names by themselves like append `_total` for counters.
291    pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
292        match self {
293            MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
294            MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
295            MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
296            MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
297            MetricValue::OperationDurationSeconds(_) => {
298                ("opendal_operation_duration", Some("seconds"))
299            }
300            MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
301            MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
302            MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
303
304            MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
305            MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
306            MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
307            MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
308            MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
309            MetricValue::HttpRequestDurationSeconds(_) => {
310                ("opendal_http_request_duration", Some("seconds"))
311            }
312            MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
313            MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
314            MetricValue::HttpResponseDurationSeconds(_) => {
315                ("opendal_http_response_duration", Some("seconds"))
316            }
317        }
318    }
319
320    /// Returns the help text for this metric value.
321    pub fn help(&self) -> &'static str {
322        match self {
323            MetricValue::OperationBytes(_) => "Current operation size in bytes, represents the size of data being processed in the current operation",
324            MetricValue::OperationBytesRate(_) => "Histogram of data processing rates in bytes per second within individual operations",
325            MetricValue::OperationEntries(_) => "Current operation size in entries, represents the entries being processed in the current operation",
326            MetricValue::OperationEntriesRate(_) => "Histogram of entries processing rates in entries per second within individual operations",
327            MetricValue::OperationDurationSeconds(_) => "Duration of operations in seconds, measured from start to completion",
328            MetricValue::OperationErrorsTotal => "Total number of failed operations",
329            MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
330            MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
331
332            MetricValue::HttpConnectionErrorsTotal => "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)",
333            MetricValue::HttpStatusErrorsTotal => "Total number of HTTP requests that received error status codes (non-2xx responses)",
334            MetricValue::HttpExecuting(_) => "Number of HTTP requests currently in flight from this client",
335            MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
336            MetricValue::HttpRequestBytesRate(_) => "Histogram of HTTP request bytes per second rates",
337            MetricValue::HttpRequestDurationSeconds(_) => "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte",
338            MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
339            MetricValue::HttpResponseBytesRate(_) => "Histogram of HTTP response bytes per second rates",
340            MetricValue::HttpResponseDurationSeconds(_) => "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received",
341        }
342    }
343}
344
345/// The interceptor for metrics.
346///
347/// All metrics related libs should implement this trait to observe opendal's internal operations.
348pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
349    /// Observe the metric value.
350    fn observe(&self, labels: MetricLabels, value: MetricValue) {
351        let _ = (labels, value);
352    }
353}
354
355/// The metrics layer for opendal.
356#[derive(Clone, Debug)]
357pub struct MetricsLayer<I: MetricsIntercept> {
358    interceptor: I,
359}
360
361impl<I: MetricsIntercept> MetricsLayer<I> {
362    /// Create a new metrics layer.
363    pub fn new(interceptor: I) -> Self {
364        Self { interceptor }
365    }
366}
367
368impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
369    type LayeredAccess = MetricsAccessor<A, I>;
370
371    fn layer(&self, inner: A) -> Self::LayeredAccess {
372        let info = inner.info();
373
374        // Update http client with metrics http fetcher.
375        info.update_http_client(|client| {
376            HttpClient::with(MetricsHttpFetcher {
377                inner: client.into_inner(),
378                info: info.clone(),
379                interceptor: self.interceptor.clone(),
380            })
381        });
382
383        MetricsAccessor {
384            inner,
385            info,
386            interceptor: self.interceptor.clone(),
387        }
388    }
389}
390
391/// The metrics http fetcher for opendal.
392pub struct MetricsHttpFetcher<I: MetricsIntercept> {
393    inner: HttpFetcher,
394    info: Arc<AccessorInfo>,
395    interceptor: I,
396}
397
398impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
399    async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
400        let labels = MetricLabels::new(
401            self.info.clone(),
402            req.extensions()
403                .get::<Operation>()
404                .copied()
405                .map(Operation::into_static)
406                .unwrap_or("unknown"),
407        );
408
409        let start = Instant::now();
410        let req_size = req.body().len();
411
412        self.interceptor
413            .observe(labels.clone(), MetricValue::HttpExecuting(1));
414
415        let res = self.inner.fetch(req).await;
416        let req_duration = start.elapsed();
417
418        match res {
419            Err(err) => {
420                self.interceptor
421                    .observe(labels.clone(), MetricValue::HttpExecuting(-1));
422                self.interceptor
423                    .observe(labels, MetricValue::HttpConnectionErrorsTotal);
424                Err(err)
425            }
426            Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
427                self.interceptor
428                    .observe(labels.clone(), MetricValue::HttpExecuting(-1));
429                self.interceptor.observe(
430                    labels.with_status_code(resp.status()),
431                    MetricValue::HttpStatusErrorsTotal,
432                );
433                Ok(resp)
434            }
435            Ok(resp) => {
436                self.interceptor.observe(
437                    labels.clone(),
438                    MetricValue::HttpRequestBytes(req_size as u64),
439                );
440                self.interceptor.observe(
441                    labels.clone(),
442                    MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
443                );
444                self.interceptor.observe(
445                    labels.clone(),
446                    MetricValue::HttpRequestDurationSeconds(req_duration),
447                );
448
449                let (parts, body) = resp.into_parts();
450                let body = body.map_inner(|s| {
451                    Box::new(MetricsStream {
452                        inner: s,
453                        interceptor: self.interceptor.clone(),
454                        labels: labels.clone(),
455                        size: 0,
456                        start: Instant::now(),
457                    })
458                });
459
460                Ok(http::Response::from_parts(parts, body))
461            }
462        }
463    }
464}
465
466pub struct MetricsStream<S, I> {
467    inner: S,
468    interceptor: I,
469
470    labels: MetricLabels,
471    size: u64,
472    start: Instant,
473}
474
475impl<S, I> Stream for MetricsStream<S, I>
476where
477    S: Stream<Item = Result<Buffer>> + Unpin + 'static,
478    I: MetricsIntercept,
479{
480    type Item = Result<Buffer>;
481
482    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
483        match ready!(self.inner.poll_next_unpin(cx)) {
484            Some(Ok(bs)) => {
485                self.size += bs.len() as u64;
486                Poll::Ready(Some(Ok(bs)))
487            }
488            Some(Err(err)) => Poll::Ready(Some(Err(err))),
489            None => {
490                let resp_size = self.size;
491                let resp_duration = self.start.elapsed();
492
493                self.interceptor.observe(
494                    self.labels.clone(),
495                    MetricValue::HttpResponseBytes(resp_size),
496                );
497                self.interceptor.observe(
498                    self.labels.clone(),
499                    MetricValue::HttpResponseBytesRate(
500                        resp_size as f64 / resp_duration.as_secs_f64(),
501                    ),
502                );
503                self.interceptor.observe(
504                    self.labels.clone(),
505                    MetricValue::HttpResponseDurationSeconds(resp_duration),
506                );
507                self.interceptor
508                    .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
509
510                Poll::Ready(None)
511            }
512        }
513    }
514}
515
516/// The metrics accessor for opendal.
517pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
518    inner: A,
519    info: Arc<AccessorInfo>,
520    interceptor: I,
521}
522
523impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
524    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525        f.debug_struct("MetricsAccessor")
526            .field("inner", &self.inner)
527            .finish_non_exhaustive()
528    }
529}
530
531impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
532    type Inner = A;
533    type Reader = MetricsWrapper<A::Reader, I>;
534    type BlockingReader = MetricsWrapper<A::BlockingReader, I>;
535    type Writer = MetricsWrapper<A::Writer, I>;
536    type BlockingWriter = MetricsWrapper<A::BlockingWriter, I>;
537    type Lister = MetricsWrapper<A::Lister, I>;
538    type BlockingLister = MetricsWrapper<A::BlockingLister, I>;
539    type Deleter = MetricsWrapper<A::Deleter, I>;
540    type BlockingDeleter = MetricsWrapper<A::BlockingDeleter, I>;
541
542    fn inner(&self) -> &Self::Inner {
543        &self.inner
544    }
545
546    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
547        let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
548
549        let start = Instant::now();
550
551        self.interceptor
552            .observe(labels.clone(), MetricValue::OperationExecuting(1));
553
554        let res = self
555            .inner()
556            .create_dir(path, args)
557            .await
558            .inspect(|_| {
559                self.interceptor.observe(
560                    labels.clone(),
561                    MetricValue::OperationDurationSeconds(start.elapsed()),
562                );
563            })
564            .inspect_err(|err| {
565                self.interceptor.observe(
566                    labels.clone().with_error(err.kind()),
567                    MetricValue::OperationErrorsTotal,
568                );
569            });
570
571        self.interceptor
572            .observe(labels, MetricValue::OperationExecuting(-1));
573        res
574    }
575
576    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
577        let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
578
579        let start = Instant::now();
580
581        self.interceptor
582            .observe(labels.clone(), MetricValue::OperationExecuting(1));
583
584        let (rp, reader) = self
585            .inner
586            .read(path, args)
587            .await
588            .inspect(|_| {
589                self.interceptor.observe(
590                    labels.clone(),
591                    MetricValue::OperationTtfbSeconds(start.elapsed()),
592                );
593            })
594            .inspect_err(|err| {
595                self.interceptor.observe(
596                    labels.clone().with_error(err.kind()),
597                    MetricValue::OperationErrorsTotal,
598                );
599            })?;
600
601        Ok((
602            rp,
603            MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
604        ))
605    }
606
607    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
608        let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
609
610        let start = Instant::now();
611
612        self.interceptor
613            .observe(labels.clone(), MetricValue::OperationExecuting(1));
614
615        let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
616            self.interceptor.observe(
617                labels.clone().with_error(err.kind()),
618                MetricValue::OperationErrorsTotal,
619            );
620        })?;
621
622        Ok((
623            rp,
624            MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
625        ))
626    }
627
628    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
629        let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
630
631        let start = Instant::now();
632
633        self.interceptor
634            .observe(labels.clone(), MetricValue::OperationExecuting(1));
635
636        let res = self
637            .inner()
638            .copy(from, to, args)
639            .await
640            .inspect(|_| {
641                self.interceptor.observe(
642                    labels.clone(),
643                    MetricValue::OperationDurationSeconds(start.elapsed()),
644                );
645            })
646            .inspect_err(|err| {
647                self.interceptor.observe(
648                    labels.clone().with_error(err.kind()),
649                    MetricValue::OperationErrorsTotal,
650                );
651            });
652
653        self.interceptor
654            .observe(labels, MetricValue::OperationExecuting(-1));
655        res
656    }
657
658    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
659        let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
660
661        let start = Instant::now();
662
663        self.interceptor
664            .observe(labels.clone(), MetricValue::OperationExecuting(1));
665
666        let res = self
667            .inner()
668            .rename(from, to, args)
669            .await
670            .inspect(|_| {
671                self.interceptor.observe(
672                    labels.clone(),
673                    MetricValue::OperationDurationSeconds(start.elapsed()),
674                );
675            })
676            .inspect_err(|err| {
677                self.interceptor.observe(
678                    labels.clone().with_error(err.kind()),
679                    MetricValue::OperationErrorsTotal,
680                );
681            });
682
683        self.interceptor
684            .observe(labels, MetricValue::OperationExecuting(-1));
685        res
686    }
687
688    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
689        let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
690
691        let start = Instant::now();
692
693        self.interceptor
694            .observe(labels.clone(), MetricValue::OperationExecuting(1));
695
696        let res = self
697            .inner()
698            .stat(path, args)
699            .await
700            .inspect(|_| {
701                self.interceptor.observe(
702                    labels.clone(),
703                    MetricValue::OperationDurationSeconds(start.elapsed()),
704                );
705            })
706            .inspect_err(|err| {
707                self.interceptor.observe(
708                    labels.clone().with_error(err.kind()),
709                    MetricValue::OperationErrorsTotal,
710                );
711            });
712
713        self.interceptor
714            .observe(labels, MetricValue::OperationExecuting(-1));
715        res
716    }
717
718    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
719        let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
720
721        let start = Instant::now();
722
723        self.interceptor
724            .observe(labels.clone(), MetricValue::OperationExecuting(1));
725
726        let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
727            self.interceptor.observe(
728                labels.clone().with_error(err.kind()),
729                MetricValue::OperationErrorsTotal,
730            );
731        })?;
732
733        Ok((
734            rp,
735            MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
736        ))
737    }
738
739    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
740        let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
741
742        let start = Instant::now();
743
744        self.interceptor
745            .observe(labels.clone(), MetricValue::OperationExecuting(1));
746
747        let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
748            self.interceptor.observe(
749                labels.clone().with_error(err.kind()),
750                MetricValue::OperationErrorsTotal,
751            );
752        })?;
753
754        Ok((
755            rp,
756            MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
757        ))
758    }
759
760    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
761        let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
762
763        let start = Instant::now();
764
765        self.interceptor
766            .observe(labels.clone(), MetricValue::OperationExecuting(1));
767
768        let res = self
769            .inner()
770            .presign(path, args)
771            .await
772            .inspect(|_| {
773                self.interceptor.observe(
774                    labels.clone(),
775                    MetricValue::OperationDurationSeconds(start.elapsed()),
776                );
777            })
778            .inspect_err(|err| {
779                self.interceptor.observe(
780                    labels.clone().with_error(err.kind()),
781                    MetricValue::OperationErrorsTotal,
782                );
783            });
784
785        self.interceptor
786            .observe(labels, MetricValue::OperationExecuting(-1));
787        res
788    }
789
790    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
791        let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
792
793        let start = Instant::now();
794
795        self.interceptor
796            .observe(labels.clone(), MetricValue::OperationExecuting(1));
797
798        let res = self
799            .inner()
800            .blocking_create_dir(path, args)
801            .inspect(|_| {
802                self.interceptor.observe(
803                    labels.clone(),
804                    MetricValue::OperationDurationSeconds(start.elapsed()),
805                );
806            })
807            .inspect_err(|err| {
808                self.interceptor.observe(
809                    labels.clone().with_error(err.kind()),
810                    MetricValue::OperationErrorsTotal,
811                );
812            });
813
814        self.interceptor
815            .observe(labels, MetricValue::OperationExecuting(-1));
816        res
817    }
818
819    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
820        let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
821
822        let start = Instant::now();
823
824        self.interceptor
825            .observe(labels.clone(), MetricValue::OperationExecuting(1));
826
827        let (rp, reader) = self
828            .inner
829            .blocking_read(path, args)
830            .inspect(|_| {
831                self.interceptor.observe(
832                    labels.clone(),
833                    MetricValue::OperationTtfbSeconds(start.elapsed()),
834                );
835            })
836            .inspect_err(|err| {
837                self.interceptor.observe(
838                    labels.clone().with_error(err.kind()),
839                    MetricValue::OperationErrorsTotal,
840                );
841            })?;
842
843        Ok((
844            rp,
845            MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
846        ))
847    }
848
849    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
850        let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
851
852        let start = Instant::now();
853
854        self.interceptor
855            .observe(labels.clone(), MetricValue::OperationExecuting(1));
856
857        let (rp, writer) = self.inner.blocking_write(path, args).inspect_err(|err| {
858            self.interceptor.observe(
859                labels.clone().with_error(err.kind()),
860                MetricValue::OperationErrorsTotal,
861            );
862        })?;
863
864        Ok((
865            rp,
866            MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
867        ))
868    }
869
870    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
871        let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
872
873        let start = Instant::now();
874
875        self.interceptor
876            .observe(labels.clone(), MetricValue::OperationExecuting(1));
877
878        let res = self
879            .inner()
880            .blocking_copy(from, to, args)
881            .inspect(|_| {
882                self.interceptor.observe(
883                    labels.clone(),
884                    MetricValue::OperationDurationSeconds(start.elapsed()),
885                );
886            })
887            .inspect_err(|err| {
888                self.interceptor.observe(
889                    labels.clone().with_error(err.kind()),
890                    MetricValue::OperationErrorsTotal,
891                );
892            });
893
894        self.interceptor
895            .observe(labels, MetricValue::OperationExecuting(-1));
896        res
897    }
898
899    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
900        let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
901
902        let start = Instant::now();
903
904        self.interceptor
905            .observe(labels.clone(), MetricValue::OperationExecuting(1));
906
907        let res = self
908            .inner()
909            .blocking_rename(from, to, args)
910            .inspect(|_| {
911                self.interceptor.observe(
912                    labels.clone(),
913                    MetricValue::OperationDurationSeconds(start.elapsed()),
914                );
915            })
916            .inspect_err(|err| {
917                self.interceptor.observe(
918                    labels.clone().with_error(err.kind()),
919                    MetricValue::OperationErrorsTotal,
920                );
921            });
922
923        self.interceptor
924            .observe(labels, MetricValue::OperationExecuting(-1));
925        res
926    }
927
928    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
929        let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
930
931        let start = Instant::now();
932
933        self.interceptor
934            .observe(labels.clone(), MetricValue::OperationExecuting(1));
935
936        let res = self
937            .inner()
938            .blocking_stat(path, args)
939            .inspect(|_| {
940                self.interceptor.observe(
941                    labels.clone(),
942                    MetricValue::OperationDurationSeconds(start.elapsed()),
943                );
944            })
945            .inspect_err(|err| {
946                self.interceptor.observe(
947                    labels.clone().with_error(err.kind()),
948                    MetricValue::OperationErrorsTotal,
949                );
950            });
951
952        self.interceptor
953            .observe(labels, MetricValue::OperationExecuting(-1));
954        res
955    }
956
957    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
958        let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
959
960        let start = Instant::now();
961
962        self.interceptor
963            .observe(labels.clone(), MetricValue::OperationExecuting(1));
964
965        let (rp, deleter) = self.inner.blocking_delete().inspect_err(|err| {
966            self.interceptor.observe(
967                labels.clone().with_error(err.kind()),
968                MetricValue::OperationErrorsTotal,
969            );
970        })?;
971
972        Ok((
973            rp,
974            MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
975        ))
976    }
977
978    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
979        let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
980
981        let start = Instant::now();
982
983        self.interceptor
984            .observe(labels.clone(), MetricValue::OperationExecuting(1));
985
986        let (rp, lister) = self.inner.blocking_list(path, args).inspect_err(|err| {
987            self.interceptor.observe(
988                labels.clone().with_error(err.kind()),
989                MetricValue::OperationErrorsTotal,
990            );
991        })?;
992
993        Ok((
994            rp,
995            MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
996        ))
997    }
998}
999
1000pub struct MetricsWrapper<R, I: MetricsIntercept> {
1001    inner: R,
1002    interceptor: I,
1003    labels: MetricLabels,
1004
1005    start: Instant,
1006    size: u64,
1007}
1008
1009impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
1010    fn drop(&mut self) {
1011        let size = self.size;
1012        let duration = self.start.elapsed();
1013
1014        if self.labels.operation == Operation::Read.into_static()
1015            || self.labels.operation == Operation::Write.into_static()
1016        {
1017            self.interceptor
1018                .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
1019            self.interceptor.observe(
1020                self.labels.clone(),
1021                MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
1022            );
1023        } else {
1024            self.interceptor.observe(
1025                self.labels.clone(),
1026                MetricValue::OperationEntries(self.size),
1027            );
1028            self.interceptor.observe(
1029                self.labels.clone(),
1030                MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
1031            );
1032        }
1033
1034        self.interceptor.observe(
1035            self.labels.clone(),
1036            MetricValue::OperationDurationSeconds(duration),
1037        );
1038        self.interceptor
1039            .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
1040    }
1041}
1042
1043impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
1044    fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
1045        Self {
1046            inner,
1047            interceptor,
1048            labels,
1049            start,
1050            size: 0,
1051        }
1052    }
1053}
1054
1055impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
1056    async fn read(&mut self) -> Result<Buffer> {
1057        self.inner
1058            .read()
1059            .await
1060            .inspect(|bs| {
1061                self.size += bs.len() as u64;
1062            })
1063            .inspect_err(|err| {
1064                self.interceptor.observe(
1065                    self.labels.clone().with_error(err.kind()),
1066                    MetricValue::OperationErrorsTotal,
1067                );
1068            })
1069    }
1070}
1071
1072impl<R: oio::BlockingRead, I: MetricsIntercept> oio::BlockingRead for MetricsWrapper<R, I> {
1073    fn read(&mut self) -> Result<Buffer> {
1074        self.inner
1075            .read()
1076            .inspect(|bs| {
1077                self.size += bs.len() as u64;
1078            })
1079            .inspect_err(|err| {
1080                self.interceptor.observe(
1081                    self.labels.clone().with_error(err.kind()),
1082                    MetricValue::OperationErrorsTotal,
1083                );
1084            })
1085    }
1086}
1087
1088impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
1089    async fn write(&mut self, bs: Buffer) -> Result<()> {
1090        let size = bs.len();
1091
1092        self.inner
1093            .write(bs)
1094            .await
1095            .inspect(|_| {
1096                self.size += size as u64;
1097            })
1098            .inspect_err(|err| {
1099                self.interceptor.observe(
1100                    self.labels.clone().with_error(err.kind()),
1101                    MetricValue::OperationErrorsTotal,
1102                );
1103            })
1104    }
1105
1106    async fn close(&mut self) -> Result<Metadata> {
1107        self.inner.close().await.inspect_err(|err| {
1108            self.interceptor.observe(
1109                self.labels.clone().with_error(err.kind()),
1110                MetricValue::OperationErrorsTotal,
1111            );
1112        })
1113    }
1114
1115    async fn abort(&mut self) -> Result<()> {
1116        self.inner.abort().await.inspect_err(|err| {
1117            self.interceptor.observe(
1118                self.labels.clone().with_error(err.kind()),
1119                MetricValue::OperationErrorsTotal,
1120            );
1121        })
1122    }
1123}
1124
1125impl<R: oio::BlockingWrite, I: MetricsIntercept> oio::BlockingWrite for MetricsWrapper<R, I> {
1126    fn write(&mut self, bs: Buffer) -> Result<()> {
1127        let size = bs.len();
1128
1129        self.inner
1130            .write(bs)
1131            .inspect(|_| {
1132                self.size += size as u64;
1133            })
1134            .inspect_err(|err| {
1135                self.interceptor.observe(
1136                    self.labels.clone().with_error(err.kind()),
1137                    MetricValue::OperationErrorsTotal,
1138                );
1139            })
1140    }
1141
1142    fn close(&mut self) -> Result<Metadata> {
1143        self.inner.close().inspect_err(|err| {
1144            self.interceptor.observe(
1145                self.labels.clone().with_error(err.kind()),
1146                MetricValue::OperationErrorsTotal,
1147            );
1148        })
1149    }
1150}
1151
1152impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
1153    async fn next(&mut self) -> Result<Option<oio::Entry>> {
1154        self.inner
1155            .next()
1156            .await
1157            .inspect(|_| {
1158                self.size += 1;
1159            })
1160            .inspect_err(|err| {
1161                self.interceptor.observe(
1162                    self.labels.clone().with_error(err.kind()),
1163                    MetricValue::OperationErrorsTotal,
1164                );
1165            })
1166    }
1167}
1168
1169impl<R: oio::BlockingList, I: MetricsIntercept> oio::BlockingList for MetricsWrapper<R, I> {
1170    fn next(&mut self) -> Result<Option<oio::Entry>> {
1171        self.inner
1172            .next()
1173            .inspect(|_| {
1174                self.size += 1;
1175            })
1176            .inspect_err(|err| {
1177                self.interceptor.observe(
1178                    self.labels.clone().with_error(err.kind()),
1179                    MetricValue::OperationErrorsTotal,
1180                );
1181            })
1182    }
1183}
1184
1185impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
1186    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1187        self.inner
1188            .delete(path, args)
1189            .inspect(|_| {
1190                self.size += 1;
1191            })
1192            .inspect_err(|err| {
1193                self.interceptor.observe(
1194                    self.labels.clone().with_error(err.kind()),
1195                    MetricValue::OperationErrorsTotal,
1196                );
1197            })
1198    }
1199
1200    async fn flush(&mut self) -> Result<usize> {
1201        self.inner.flush().await.inspect_err(|err| {
1202            self.interceptor.observe(
1203                self.labels.clone().with_error(err.kind()),
1204                MetricValue::OperationErrorsTotal,
1205            );
1206        })
1207    }
1208}
1209
1210impl<R: oio::BlockingDelete, I: MetricsIntercept> oio::BlockingDelete for MetricsWrapper<R, I> {
1211    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1212        self.inner
1213            .delete(path, args)
1214            .inspect(|_| {
1215                self.size += 1;
1216            })
1217            .inspect_err(|err| {
1218                self.interceptor.observe(
1219                    self.labels.clone().with_error(err.kind()),
1220                    MetricValue::OperationErrorsTotal,
1221                );
1222            })
1223    }
1224
1225    fn flush(&mut self) -> Result<usize> {
1226        self.inner.flush().inspect_err(|err| {
1227            self.interceptor.observe(
1228                self.labels.clone().with_error(err.kind()),
1229                MetricValue::OperationErrorsTotal,
1230            );
1231        })
1232    }
1233}