opendal/layers/observe/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::ready;
23use std::task::Context;
24use std::task::Poll;
25use std::time::Duration;
26use std::time::Instant;
27
28use futures::Stream;
29use futures::StreamExt;
30use http::StatusCode;
31
32use crate::raw::*;
33use crate::*;
34
35const KIB: f64 = 1024.0;
36const MIB: f64 = 1024.0 * KIB;
37const GIB: f64 = 1024.0 * MIB;
38
39/// Buckets for data size metrics like OperationBytes
40/// Covers typical file and object sizes from small files to large objects
41pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
42    4.0 * KIB,   // Small files
43    64.0 * KIB,  // File system block size
44    256.0 * KIB, //
45    1.0 * MIB,   // Common size threshold for many systems
46    4.0 * MIB,   // Best size for most http based systems
47    16.0 * MIB,  //
48    64.0 * MIB,  // Widely used threshold for multipart uploads
49    256.0 * MIB, //
50    1.0 * GIB,   // Considered large for many systems
51    5.0 * GIB,   // Maximum size in single upload for many cloud storage services
52];
53
54/// Buckets for data transfer rate metrics like OperationBytesRate
55///
56/// Covers various network speeds from slow connections to high-speed transfers
57///
58/// Note: this is for single operation rate, not for total bandwidth.
59pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
60    // Low-speed network range (mobile/weak connections)
61    8.0 * KIB,   // ~64Kbps - 2G networks
62    32.0 * KIB,  // ~256Kbps - 3G networks
63    128.0 * KIB, // ~1Mbps - Basic broadband
64    // Standard broadband range
65    1.0 * MIB,  // ~8Mbps - Entry-level broadband
66    8.0 * MIB,  // ~64Mbps - Fast broadband
67    32.0 * MIB, // ~256Mbps - Gigabit broadband
68    // High-performance network range
69    128.0 * MIB, // ~1Gbps - Standard datacenter
70    512.0 * MIB, // ~4Gbps - Fast datacenter
71    2.0 * GIB,   // ~16Gbps - High-end interconnects
72    // Ultra-high-speed range
73    8.0 * GIB,  // ~64Gbps - InfiniBand/RDMA
74    32.0 * GIB, // ~256Gbps - Top-tier datacenters
75];
76
77/// Buckets for batch operation entry counts (OperationEntriesCount)
78/// Covers scenarios from single entry operations to large batch operations
79pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
80    1.0,     // Single item operations
81    5.0,     // Very small batches
82    10.0,    // Small batches
83    50.0,    // Medium batches
84    100.0,   // Standard batch size
85    500.0,   // Large batches
86    1000.0,  // Very large batches, API limits for some services
87    5000.0,  // Huge batches, multi-page operations
88    10000.0, // Extremely large operations, multi-request batches
89];
90
91/// Buckets for batch operation processing rates (OperationEntriesRate)
92/// Measures how many entries can be processed per second
93pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
94    1.0,     // Slowest processing, heavy operations per entry
95    10.0,    // Slow processing, complex operations
96    50.0,    // Moderate processing speed
97    100.0,   // Good processing speed, efficient operations
98    500.0,   // Fast processing, optimized operations
99    1000.0,  // Very fast processing, simple operations
100    5000.0,  // Extremely fast processing, bulk operations
101    10000.0, // Maximum speed, listing operations, local systems
102];
103
104/// Buckets for operation duration metrics like OperationDurationSeconds
105/// Covers timeframes from fast metadata operations to long-running transfers
106pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
107    0.001, // 1ms - Fastest operations, cached responses
108    0.01,  // 10ms - Fast metadata operations, local operations
109    0.05,  // 50ms - Quick operations, nearby cloud resources
110    0.1,   // 100ms - Standard API response times, typical cloud latency
111    0.25,  // 250ms - Medium operations, small data transfers
112    0.5,   // 500ms - Medium-long operations, larger metadata operations
113    1.0,   // 1s - Long operations, small file transfers
114    2.5,   // 2.5s - Extended operations, medium file transfers
115    5.0,   // 5s - Long-running operations, large transfers
116    10.0,  // 10s - Very long operations, very large transfers
117    30.0,  // 30s - Extended operations, complex batch processes
118    60.0,  // 1min - Near timeout operations, extremely large transfers
119];
120
121/// Buckets for time to first byte metrics like OperationTtfbSeconds
122/// Focuses on initial response times, which are typically shorter than full operations
123pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
124    0.001, // 1ms - Cached or local resources
125    0.01,  // 10ms - Very fast responses, same region
126    0.025, // 25ms - Fast responses, optimized configurations
127    0.05,  // 50ms - Good response times, standard configurations
128    0.1,   // 100ms - Average response times for cloud storage
129    0.2,   // 200ms - Slower responses, cross-region or throttled
130    0.4,   // 400ms - Poor response times, network congestion
131    0.8,   // 800ms - Very slow responses, potential issues
132    1.6,   // 1.6s - Problematic responses, retry territory
133    3.2,   // 3.2s - Critical latency issues, close to timeouts
134];
135
136/// The metric label for the scheme like s3, fs, cos.
137pub static LABEL_SCHEME: &str = "scheme";
138/// The metric label for the namespace like bucket name in s3.
139pub static LABEL_NAMESPACE: &str = "namespace";
140/// The metric label for the root path.
141pub static LABEL_ROOT: &str = "root";
142/// The metric label for the operation like read, write, list.
143pub static LABEL_OPERATION: &str = "operation";
144/// The metric label for the error.
145pub static LABEL_ERROR: &str = "error";
146/// The metric label for the http code.
147pub static LABEL_STATUS_CODE: &str = "status_code";
148
149/// MetricLabels are the labels for the metrics.
150#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
151pub struct MetricLabels {
152    /// The storage scheme identifier (e.g., "s3", "gcs", "azblob", "fs").
153    /// Used to differentiate between different storage backends.
154    pub scheme: Scheme,
155    /// The storage namespace (e.g., bucket name, container name).
156    /// Identifies the specific storage container being accessed.
157    pub namespace: Arc<str>,
158    /// The root path within the namespace that was configured.
159    /// Used to track operations within a specific path prefix.
160    pub root: Arc<str>,
161    /// The operation being performed (e.g., "read", "write", "list").
162    /// Identifies which API operation generated this metric.
163    pub operation: &'static str,
164    /// The specific error kind that occurred during an operation.
165    /// Only populated for `OperationErrorsTotal` metric.
166    /// Used to track frequency of specific error types.
167    pub error: Option<ErrorKind>,
168    /// The HTTP status code received in an error response.
169    /// Only populated for `HttpStatusErrorsTotal` metric.
170    /// Used to track frequency of specific HTTP error status codes.
171    pub status_code: Option<StatusCode>,
172}
173
174impl MetricLabels {
175    /// Create a new set of MetricLabels.
176    fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
177        MetricLabels {
178            scheme: info.scheme(),
179            namespace: info.name(),
180            root: info.root(),
181            operation: op,
182            ..MetricLabels::default()
183        }
184    }
185
186    /// Add error to the metric labels.
187    fn with_error(mut self, err: ErrorKind) -> Self {
188        self.error = Some(err);
189        self
190    }
191
192    /// Add status code to the metric labels.
193    fn with_status_code(mut self, code: StatusCode) -> Self {
194        self.status_code = Some(code);
195        self
196    }
197}
198
199/// MetricValue is the value the opendal sends to the metrics impls.
200///
201/// Metrics impls can be `prometheus_client`, `metrics` etc.
202///
203/// Every metrics impls SHOULD implement observe over the MetricValue to make
204/// sure they provide the consistent metrics for users.
205#[non_exhaustive]
206#[derive(Debug, Clone, Copy)]
207pub enum MetricValue {
208    /// Record the size of data processed in bytes.
209    /// Metrics impl: Update a Histogram with the given byte count.
210    OperationBytes(u64),
211    /// Record the rate of data processing in bytes/second.
212    /// Metrics impl: Update a Histogram with the calculated rate value.
213    OperationBytesRate(f64),
214    /// Record the number of entries (files, objects, keys) processed.
215    /// Metrics impl: Update a Histogram with the entry count.
216    OperationEntries(u64),
217    /// Record the rate of entries processing in entries/second.
218    /// Metrics impl: Update a Histogram with the calculated rate value.
219    OperationEntriesRate(f64),
220    /// Record the total duration of an operation.
221    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
222    OperationDurationSeconds(Duration),
223    /// Increment the counter for operation errors.
224    /// Metrics impl: Increment a Counter by 1.
225    OperationErrorsTotal,
226    /// Update the current number of executing operations.
227    /// Metrics impl: Add the value (positive or negative) to a Gauge.
228    OperationExecuting(isize),
229    /// Record the time to first byte duration.
230    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
231    OperationTtfbSeconds(Duration),
232    /// Update the current number of executing HTTP requests.
233    /// Metrics impl: Add the value (positive or negative) to a Gauge.
234    HttpExecuting(isize),
235    /// Record the size of HTTP request body in bytes.
236    /// Metrics impl: Update a Histogram with the given byte count.
237    HttpRequestBytes(u64),
238    /// Record the rate of HTTP request data in bytes/second.
239    /// Metrics impl: Update a Histogram with the calculated rate value.
240    HttpRequestBytesRate(f64),
241    /// Record the duration of sending an HTTP request (until first byte received).
242    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
243    HttpRequestDurationSeconds(Duration),
244    /// Record the size of HTTP response body in bytes.
245    /// Metrics impl: Update a Histogram with the given byte count.
246    HttpResponseBytes(u64),
247    /// Record the rate of HTTP response data in bytes/second.
248    /// Metrics impl: Update a Histogram with the calculated rate value.
249    HttpResponseBytesRate(f64),
250    /// Record the duration of receiving an HTTP response (from first byte to last).
251    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
252    HttpResponseDurationSeconds(Duration),
253    /// Increment the counter for HTTP connection errors.
254    /// Metrics impl: Increment a Counter by 1.
255    HttpConnectionErrorsTotal,
256    /// Increment the counter for HTTP status errors (non-2xx responses).
257    /// Metrics impl: Increment a Counter by 1.
258    HttpStatusErrorsTotal,
259}
260
261impl MetricValue {
262    /// Returns the full metric name for this metric value.
263    pub fn name(&self) -> &'static str {
264        match self {
265            MetricValue::OperationBytes(_) => "opendal_operation_bytes",
266            MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
267            MetricValue::OperationEntries(_) => "opendal_operation_entries",
268            MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
269            MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
270            MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
271            MetricValue::OperationExecuting(_) => "opendal_operation_executing",
272            MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
273
274            MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
275            MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
276            MetricValue::HttpExecuting(_) => "opendal_http_executing",
277            MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
278            MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
279            MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
280            MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
281            MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
282            MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
283        }
284    }
285
286    /// Returns the metric name along with unit for this metric value.
287    ///
288    /// # Notes
289    ///
290    /// This API is designed for the metrics impls that unit aware. They will handle the names by themselves like append `_total` for counters.
291    pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
292        match self {
293            MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
294            MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
295            MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
296            MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
297            MetricValue::OperationDurationSeconds(_) => {
298                ("opendal_operation_duration", Some("seconds"))
299            }
300            MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
301            MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
302            MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
303
304            MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
305            MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
306            MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
307            MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
308            MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
309            MetricValue::HttpRequestDurationSeconds(_) => {
310                ("opendal_http_request_duration", Some("seconds"))
311            }
312            MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
313            MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
314            MetricValue::HttpResponseDurationSeconds(_) => {
315                ("opendal_http_response_duration", Some("seconds"))
316            }
317        }
318    }
319
320    /// Returns the help text for this metric value.
321    pub fn help(&self) -> &'static str {
322        match self {
323            MetricValue::OperationBytes(_) => "Current operation size in bytes, represents the size of data being processed in the current operation",
324            MetricValue::OperationBytesRate(_) => "Histogram of data processing rates in bytes per second within individual operations",
325            MetricValue::OperationEntries(_) => "Current operation size in entries, represents the entries being processed in the current operation",
326            MetricValue::OperationEntriesRate(_) => "Histogram of entries processing rates in entries per second within individual operations",
327            MetricValue::OperationDurationSeconds(_) => "Duration of operations in seconds, measured from start to completion",
328            MetricValue::OperationErrorsTotal => "Total number of failed operations",
329            MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
330            MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
331
332            MetricValue::HttpConnectionErrorsTotal => "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)",
333            MetricValue::HttpStatusErrorsTotal => "Total number of HTTP requests that received error status codes (non-2xx responses)",
334            MetricValue::HttpExecuting(_) => "Number of HTTP requests currently in flight from this client",
335            MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
336            MetricValue::HttpRequestBytesRate(_) => "Histogram of HTTP request bytes per second rates",
337            MetricValue::HttpRequestDurationSeconds(_) => "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte",
338            MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
339            MetricValue::HttpResponseBytesRate(_) => "Histogram of HTTP response bytes per second rates",
340            MetricValue::HttpResponseDurationSeconds(_) => "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received",
341        }
342    }
343}
344
345/// The interceptor for metrics.
346///
347/// All metrics related libs should implement this trait to observe opendal's internal operations.
348pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
349    /// Observe the metric value.
350    fn observe(&self, labels: MetricLabels, value: MetricValue) {
351        let _ = (labels, value);
352    }
353}
354
355/// The metrics layer for opendal.
356#[derive(Clone, Debug)]
357pub struct MetricsLayer<I: MetricsIntercept> {
358    interceptor: I,
359}
360
361impl<I: MetricsIntercept> MetricsLayer<I> {
362    /// Create a new metrics layer.
363    pub fn new(interceptor: I) -> Self {
364        Self { interceptor }
365    }
366}
367
368impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
369    type LayeredAccess = MetricsAccessor<A, I>;
370
371    fn layer(&self, inner: A) -> Self::LayeredAccess {
372        let info = inner.info();
373
374        // Update http client with metrics http fetcher.
375        info.update_http_client(|client| {
376            HttpClient::with(MetricsHttpFetcher {
377                inner: client.into_inner(),
378                info: info.clone(),
379                interceptor: self.interceptor.clone(),
380            })
381        });
382
383        MetricsAccessor {
384            inner,
385            info,
386            interceptor: self.interceptor.clone(),
387        }
388    }
389}
390
391/// The metrics http fetcher for opendal.
392pub struct MetricsHttpFetcher<I: MetricsIntercept> {
393    inner: HttpFetcher,
394    info: Arc<AccessorInfo>,
395    interceptor: I,
396}
397
398impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
399    async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
400        let labels = MetricLabels::new(
401            self.info.clone(),
402            req.extensions()
403                .get::<Operation>()
404                .copied()
405                .map(Operation::into_static)
406                .unwrap_or("unknown"),
407        );
408
409        let start = Instant::now();
410        let req_size = req.body().len();
411
412        self.interceptor
413            .observe(labels.clone(), MetricValue::HttpExecuting(1));
414
415        let res = self.inner.fetch(req).await;
416        let req_duration = start.elapsed();
417
418        match res {
419            Err(err) => {
420                self.interceptor
421                    .observe(labels.clone(), MetricValue::HttpExecuting(-1));
422                self.interceptor
423                    .observe(labels, MetricValue::HttpConnectionErrorsTotal);
424                Err(err)
425            }
426            Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
427                self.interceptor
428                    .observe(labels.clone(), MetricValue::HttpExecuting(-1));
429                self.interceptor.observe(
430                    labels.with_status_code(resp.status()),
431                    MetricValue::HttpStatusErrorsTotal,
432                );
433                Ok(resp)
434            }
435            Ok(resp) => {
436                self.interceptor.observe(
437                    labels.clone(),
438                    MetricValue::HttpRequestBytes(req_size as u64),
439                );
440                self.interceptor.observe(
441                    labels.clone(),
442                    MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
443                );
444                self.interceptor.observe(
445                    labels.clone(),
446                    MetricValue::HttpRequestDurationSeconds(req_duration),
447                );
448
449                let (parts, body) = resp.into_parts();
450                let body = body.map_inner(|s| {
451                    Box::new(MetricsStream {
452                        inner: s,
453                        interceptor: self.interceptor.clone(),
454                        labels: labels.clone(),
455                        size: 0,
456                        start: Instant::now(),
457                    })
458                });
459
460                Ok(http::Response::from_parts(parts, body))
461            }
462        }
463    }
464}
465
466pub struct MetricsStream<S, I> {
467    inner: S,
468    interceptor: I,
469
470    labels: MetricLabels,
471    size: u64,
472    start: Instant,
473}
474
475impl<S, I> Stream for MetricsStream<S, I>
476where
477    S: Stream<Item = Result<Buffer>> + Unpin + 'static,
478    I: MetricsIntercept,
479{
480    type Item = Result<Buffer>;
481
482    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
483        match ready!(self.inner.poll_next_unpin(cx)) {
484            Some(Ok(bs)) => {
485                self.size += bs.len() as u64;
486                Poll::Ready(Some(Ok(bs)))
487            }
488            Some(Err(err)) => Poll::Ready(Some(Err(err))),
489            None => {
490                let resp_size = self.size;
491                let resp_duration = self.start.elapsed();
492
493                self.interceptor.observe(
494                    self.labels.clone(),
495                    MetricValue::HttpResponseBytes(resp_size),
496                );
497                self.interceptor.observe(
498                    self.labels.clone(),
499                    MetricValue::HttpResponseBytesRate(
500                        resp_size as f64 / resp_duration.as_secs_f64(),
501                    ),
502                );
503                self.interceptor.observe(
504                    self.labels.clone(),
505                    MetricValue::HttpResponseDurationSeconds(resp_duration),
506                );
507                self.interceptor
508                    .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
509
510                Poll::Ready(None)
511            }
512        }
513    }
514}
515
516/// The metrics accessor for opendal.
517pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
518    inner: A,
519    info: Arc<AccessorInfo>,
520    interceptor: I,
521}
522
523impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
524    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525        f.debug_struct("MetricsAccessor")
526            .field("inner", &self.inner)
527            .finish_non_exhaustive()
528    }
529}
530
531impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
532    type Inner = A;
533    type Reader = MetricsWrapper<A::Reader, I>;
534    type Writer = MetricsWrapper<A::Writer, I>;
535    type Lister = MetricsWrapper<A::Lister, I>;
536    type Deleter = MetricsWrapper<A::Deleter, I>;
537
538    fn inner(&self) -> &Self::Inner {
539        &self.inner
540    }
541
542    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
543        let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
544
545        let start = Instant::now();
546
547        self.interceptor
548            .observe(labels.clone(), MetricValue::OperationExecuting(1));
549
550        let res = self
551            .inner()
552            .create_dir(path, args)
553            .await
554            .inspect(|_| {
555                self.interceptor.observe(
556                    labels.clone(),
557                    MetricValue::OperationDurationSeconds(start.elapsed()),
558                );
559            })
560            .inspect_err(|err| {
561                self.interceptor.observe(
562                    labels.clone().with_error(err.kind()),
563                    MetricValue::OperationErrorsTotal,
564                );
565            });
566
567        self.interceptor
568            .observe(labels, MetricValue::OperationExecuting(-1));
569        res
570    }
571
572    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
573        let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
574
575        let start = Instant::now();
576
577        self.interceptor
578            .observe(labels.clone(), MetricValue::OperationExecuting(1));
579
580        let (rp, reader) = self
581            .inner
582            .read(path, args)
583            .await
584            .inspect(|_| {
585                self.interceptor.observe(
586                    labels.clone(),
587                    MetricValue::OperationTtfbSeconds(start.elapsed()),
588                );
589            })
590            .inspect_err(|err| {
591                self.interceptor.observe(
592                    labels.clone().with_error(err.kind()),
593                    MetricValue::OperationErrorsTotal,
594                );
595            })?;
596
597        Ok((
598            rp,
599            MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
600        ))
601    }
602
603    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
604        let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
605
606        let start = Instant::now();
607
608        self.interceptor
609            .observe(labels.clone(), MetricValue::OperationExecuting(1));
610
611        let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
612            self.interceptor.observe(
613                labels.clone().with_error(err.kind()),
614                MetricValue::OperationErrorsTotal,
615            );
616        })?;
617
618        Ok((
619            rp,
620            MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
621        ))
622    }
623
624    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
625        let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
626
627        let start = Instant::now();
628
629        self.interceptor
630            .observe(labels.clone(), MetricValue::OperationExecuting(1));
631
632        let res = self
633            .inner()
634            .copy(from, to, args)
635            .await
636            .inspect(|_| {
637                self.interceptor.observe(
638                    labels.clone(),
639                    MetricValue::OperationDurationSeconds(start.elapsed()),
640                );
641            })
642            .inspect_err(|err| {
643                self.interceptor.observe(
644                    labels.clone().with_error(err.kind()),
645                    MetricValue::OperationErrorsTotal,
646                );
647            });
648
649        self.interceptor
650            .observe(labels, MetricValue::OperationExecuting(-1));
651        res
652    }
653
654    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
655        let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
656
657        let start = Instant::now();
658
659        self.interceptor
660            .observe(labels.clone(), MetricValue::OperationExecuting(1));
661
662        let res = self
663            .inner()
664            .rename(from, to, args)
665            .await
666            .inspect(|_| {
667                self.interceptor.observe(
668                    labels.clone(),
669                    MetricValue::OperationDurationSeconds(start.elapsed()),
670                );
671            })
672            .inspect_err(|err| {
673                self.interceptor.observe(
674                    labels.clone().with_error(err.kind()),
675                    MetricValue::OperationErrorsTotal,
676                );
677            });
678
679        self.interceptor
680            .observe(labels, MetricValue::OperationExecuting(-1));
681        res
682    }
683
684    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
685        let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
686
687        let start = Instant::now();
688
689        self.interceptor
690            .observe(labels.clone(), MetricValue::OperationExecuting(1));
691
692        let res = self
693            .inner()
694            .stat(path, args)
695            .await
696            .inspect(|_| {
697                self.interceptor.observe(
698                    labels.clone(),
699                    MetricValue::OperationDurationSeconds(start.elapsed()),
700                );
701            })
702            .inspect_err(|err| {
703                self.interceptor.observe(
704                    labels.clone().with_error(err.kind()),
705                    MetricValue::OperationErrorsTotal,
706                );
707            });
708
709        self.interceptor
710            .observe(labels, MetricValue::OperationExecuting(-1));
711        res
712    }
713
714    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
715        let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
716
717        let start = Instant::now();
718
719        self.interceptor
720            .observe(labels.clone(), MetricValue::OperationExecuting(1));
721
722        let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
723            self.interceptor.observe(
724                labels.clone().with_error(err.kind()),
725                MetricValue::OperationErrorsTotal,
726            );
727        })?;
728
729        Ok((
730            rp,
731            MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
732        ))
733    }
734
735    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
736        let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
737
738        let start = Instant::now();
739
740        self.interceptor
741            .observe(labels.clone(), MetricValue::OperationExecuting(1));
742
743        let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
744            self.interceptor.observe(
745                labels.clone().with_error(err.kind()),
746                MetricValue::OperationErrorsTotal,
747            );
748        })?;
749
750        Ok((
751            rp,
752            MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
753        ))
754    }
755
756    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
757        let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
758
759        let start = Instant::now();
760
761        self.interceptor
762            .observe(labels.clone(), MetricValue::OperationExecuting(1));
763
764        let res = self
765            .inner()
766            .presign(path, args)
767            .await
768            .inspect(|_| {
769                self.interceptor.observe(
770                    labels.clone(),
771                    MetricValue::OperationDurationSeconds(start.elapsed()),
772                );
773            })
774            .inspect_err(|err| {
775                self.interceptor.observe(
776                    labels.clone().with_error(err.kind()),
777                    MetricValue::OperationErrorsTotal,
778                );
779            });
780
781        self.interceptor
782            .observe(labels, MetricValue::OperationExecuting(-1));
783        res
784    }
785}
786
787pub struct MetricsWrapper<R, I: MetricsIntercept> {
788    inner: R,
789    interceptor: I,
790    labels: MetricLabels,
791
792    start: Instant,
793    size: u64,
794}
795
796impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
797    fn drop(&mut self) {
798        let size = self.size;
799        let duration = self.start.elapsed();
800
801        if self.labels.operation == Operation::Read.into_static()
802            || self.labels.operation == Operation::Write.into_static()
803        {
804            self.interceptor
805                .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
806            self.interceptor.observe(
807                self.labels.clone(),
808                MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
809            );
810        } else {
811            self.interceptor.observe(
812                self.labels.clone(),
813                MetricValue::OperationEntries(self.size),
814            );
815            self.interceptor.observe(
816                self.labels.clone(),
817                MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
818            );
819        }
820
821        self.interceptor.observe(
822            self.labels.clone(),
823            MetricValue::OperationDurationSeconds(duration),
824        );
825        self.interceptor
826            .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
827    }
828}
829
830impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
831    fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
832        Self {
833            inner,
834            interceptor,
835            labels,
836            start,
837            size: 0,
838        }
839    }
840}
841
842impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
843    async fn read(&mut self) -> Result<Buffer> {
844        self.inner
845            .read()
846            .await
847            .inspect(|bs| {
848                self.size += bs.len() as u64;
849            })
850            .inspect_err(|err| {
851                self.interceptor.observe(
852                    self.labels.clone().with_error(err.kind()),
853                    MetricValue::OperationErrorsTotal,
854                );
855            })
856    }
857}
858
859impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
860    async fn write(&mut self, bs: Buffer) -> Result<()> {
861        let size = bs.len();
862
863        self.inner
864            .write(bs)
865            .await
866            .inspect(|_| {
867                self.size += size as u64;
868            })
869            .inspect_err(|err| {
870                self.interceptor.observe(
871                    self.labels.clone().with_error(err.kind()),
872                    MetricValue::OperationErrorsTotal,
873                );
874            })
875    }
876
877    async fn close(&mut self) -> Result<Metadata> {
878        self.inner.close().await.inspect_err(|err| {
879            self.interceptor.observe(
880                self.labels.clone().with_error(err.kind()),
881                MetricValue::OperationErrorsTotal,
882            );
883        })
884    }
885
886    async fn abort(&mut self) -> Result<()> {
887        self.inner.abort().await.inspect_err(|err| {
888            self.interceptor.observe(
889                self.labels.clone().with_error(err.kind()),
890                MetricValue::OperationErrorsTotal,
891            );
892        })
893    }
894}
895
896impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
897    async fn next(&mut self) -> Result<Option<oio::Entry>> {
898        self.inner
899            .next()
900            .await
901            .inspect(|_| {
902                self.size += 1;
903            })
904            .inspect_err(|err| {
905                self.interceptor.observe(
906                    self.labels.clone().with_error(err.kind()),
907                    MetricValue::OperationErrorsTotal,
908                );
909            })
910    }
911}
912
913impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
914    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
915        self.inner
916            .delete(path, args)
917            .inspect(|_| {
918                self.size += 1;
919            })
920            .inspect_err(|err| {
921                self.interceptor.observe(
922                    self.labels.clone().with_error(err.kind()),
923                    MetricValue::OperationErrorsTotal,
924                );
925            })
926    }
927
928    async fn flush(&mut self) -> Result<usize> {
929        self.inner.flush().await.inspect_err(|err| {
930            self.interceptor.observe(
931                self.labels.clone().with_error(err.kind()),
932                MetricValue::OperationErrorsTotal,
933            );
934        })
935    }
936}