opendal_core/layers/observe/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25
26use futures::Stream;
27use futures::StreamExt;
28use http::StatusCode;
29
30use crate::raw::*;
31use crate::*;
32
33const KIB: f64 = 1024.0;
34const MIB: f64 = 1024.0 * KIB;
35const GIB: f64 = 1024.0 * MIB;
36
37/// Buckets for data size metrics like OperationBytes
38/// Covers typical file and object sizes from small files to large objects
39pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
40    4.0 * KIB,   // Small files
41    64.0 * KIB,  // File system block size
42    256.0 * KIB, //
43    1.0 * MIB,   // Common size threshold for many systems
44    4.0 * MIB,   // Best size for most http based systems
45    16.0 * MIB,  //
46    64.0 * MIB,  // Widely used threshold for multipart uploads
47    256.0 * MIB, //
48    1.0 * GIB,   // Considered large for many systems
49    5.0 * GIB,   // Maximum size in single upload for many cloud storage services
50];
51
52/// Buckets for data transfer rate metrics like OperationBytesRate
53///
54/// Covers various network speeds from slow connections to high-speed transfers
55///
56/// Note: this is for single operation rate, not for total bandwidth.
57pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
58    // Low-speed network range (mobile/weak connections)
59    8.0 * KIB,   // ~64Kbps - 2G networks
60    32.0 * KIB,  // ~256Kbps - 3G networks
61    128.0 * KIB, // ~1Mbps - Basic broadband
62    // Standard broadband range
63    1.0 * MIB,  // ~8Mbps - Entry-level broadband
64    8.0 * MIB,  // ~64Mbps - Fast broadband
65    32.0 * MIB, // ~256Mbps - Gigabit broadband
66    // High-performance network range
67    128.0 * MIB, // ~1Gbps - Standard datacenter
68    512.0 * MIB, // ~4Gbps - Fast datacenter
69    2.0 * GIB,   // ~16Gbps - High-end interconnects
70    // Ultra-high-speed range
71    8.0 * GIB,  // ~64Gbps - InfiniBand/RDMA
72    32.0 * GIB, // ~256Gbps - Top-tier datacenters
73];
74
75/// Buckets for batch operation entry counts (OperationEntriesCount)
76/// Covers scenarios from single entry operations to large batch operations
77pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
78    1.0,     // Single item operations
79    5.0,     // Very small batches
80    10.0,    // Small batches
81    50.0,    // Medium batches
82    100.0,   // Standard batch size
83    500.0,   // Large batches
84    1000.0,  // Very large batches, API limits for some services
85    5000.0,  // Huge batches, multi-page operations
86    10000.0, // Extremely large operations, multi-request batches
87];
88
89/// Buckets for batch operation processing rates (OperationEntriesRate)
90/// Measures how many entries can be processed per second
91pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
92    1.0,     // Slowest processing, heavy operations per entry
93    10.0,    // Slow processing, complex operations
94    50.0,    // Moderate processing speed
95    100.0,   // Good processing speed, efficient operations
96    500.0,   // Fast processing, optimized operations
97    1000.0,  // Very fast processing, simple operations
98    5000.0,  // Extremely fast processing, bulk operations
99    10000.0, // Maximum speed, listing operations, local systems
100];
101
102/// Buckets for operation duration metrics like OperationDurationSeconds
103/// Covers timeframes from fast metadata operations to long-running transfers
104pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
105    0.001, // 1ms - Fastest operations, cached responses
106    0.01,  // 10ms - Fast metadata operations, local operations
107    0.05,  // 50ms - Quick operations, nearby cloud resources
108    0.1,   // 100ms - Standard API response times, typical cloud latency
109    0.25,  // 250ms - Medium operations, small data transfers
110    0.5,   // 500ms - Medium-long operations, larger metadata operations
111    1.0,   // 1s - Long operations, small file transfers
112    2.5,   // 2.5s - Extended operations, medium file transfers
113    5.0,   // 5s - Long-running operations, large transfers
114    10.0,  // 10s - Very long operations, very large transfers
115    30.0,  // 30s - Extended operations, complex batch processes
116    60.0,  // 1min - Near timeout operations, extremely large transfers
117];
118
119/// Buckets for time to first byte metrics like OperationTtfbSeconds
120/// Focuses on initial response times, which are typically shorter than full operations
121pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
122    0.001, // 1ms - Cached or local resources
123    0.01,  // 10ms - Very fast responses, same region
124    0.025, // 25ms - Fast responses, optimized configurations
125    0.05,  // 50ms - Good response times, standard configurations
126    0.1,   // 100ms - Average response times for cloud storage
127    0.2,   // 200ms - Slower responses, cross-region or throttled
128    0.4,   // 400ms - Poor response times, network congestion
129    0.8,   // 800ms - Very slow responses, potential issues
130    1.6,   // 1.6s - Problematic responses, retry territory
131    3.2,   // 3.2s - Critical latency issues, close to timeouts
132];
133
134/// The metric label for the scheme like s3, fs, cos.
135pub static LABEL_SCHEME: &str = "scheme";
136/// The metric label for the namespace like bucket name in s3.
137pub static LABEL_NAMESPACE: &str = "namespace";
138/// The metric label for the root path.
139pub static LABEL_ROOT: &str = "root";
140/// The metric label for the operation like read, write, list.
141pub static LABEL_OPERATION: &str = "operation";
142/// The metric label for the error.
143pub static LABEL_ERROR: &str = "error";
144/// The metric label for the http code.
145pub static LABEL_STATUS_CODE: &str = "status_code";
146
147/// MetricLabels are the labels for the metrics.
148#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
149pub struct MetricLabels {
150    /// The storage scheme identifier (e.g., "s3", "gcs", "azblob", "fs").
151    /// Used to differentiate between different storage backends.
152    pub scheme: &'static str,
153    /// The storage namespace (e.g., bucket name, container name).
154    /// Identifies the specific storage container being accessed.
155    pub namespace: Arc<str>,
156    /// The root path within the namespace that was configured.
157    /// Used to track operations within a specific path prefix.
158    pub root: Arc<str>,
159    /// The operation being performed (e.g., "read", "write", "list").
160    /// Identifies which API operation generated this metric.
161    pub operation: &'static str,
162    /// The specific error kind that occurred during an operation.
163    /// Only populated for `OperationErrorsTotal` metric.
164    /// Used to track frequency of specific error types.
165    pub error: Option<ErrorKind>,
166    /// The HTTP status code received in an error response.
167    /// Only populated for `HttpStatusErrorsTotal` metric.
168    /// Used to track frequency of specific HTTP error status codes.
169    pub status_code: Option<StatusCode>,
170}
171
172impl MetricLabels {
173    /// Create a new set of MetricLabels.
174    fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
175        MetricLabels {
176            scheme: info.scheme(),
177            namespace: info.name(),
178            root: info.root(),
179            operation: op,
180            ..MetricLabels::default()
181        }
182    }
183
184    /// Add error to the metric labels.
185    fn with_error(mut self, err: ErrorKind) -> Self {
186        self.error = Some(err);
187        self
188    }
189
190    /// Add status code to the metric labels.
191    fn with_status_code(mut self, code: StatusCode) -> Self {
192        self.status_code = Some(code);
193        self
194    }
195}
196
197/// MetricValue is the value the opendal sends to the metrics impls.
198///
199/// Metrics impls can be `prometheus_client`, `metrics` etc.
200///
201/// Every metrics impls SHOULD implement observe over the MetricValue to make
202/// sure they provide the consistent metrics for users.
203#[non_exhaustive]
204#[derive(Debug, Clone, Copy)]
205pub enum MetricValue {
206    /// Record the size of data processed in bytes.
207    /// Metrics impl: Update a Histogram with the given byte count.
208    OperationBytes(u64),
209    /// Record the rate of data processing in bytes/second.
210    /// Metrics impl: Update a Histogram with the calculated rate value.
211    OperationBytesRate(f64),
212    /// Record the number of entries (files, objects, keys) processed.
213    /// Metrics impl: Update a Histogram with the entry count.
214    OperationEntries(u64),
215    /// Record the rate of entries processing in entries/second.
216    /// Metrics impl: Update a Histogram with the calculated rate value.
217    OperationEntriesRate(f64),
218    /// Record the total duration of an operation.
219    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
220    OperationDurationSeconds(Duration),
221    /// Increment the counter for operation errors.
222    /// Metrics impl: Increment a Counter by 1.
223    OperationErrorsTotal,
224    /// Update the current number of executing operations.
225    /// Metrics impl: Add the value (positive or negative) to a Gauge.
226    OperationExecuting(isize),
227    /// Record the time to first byte duration.
228    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
229    OperationTtfbSeconds(Duration),
230    /// Update the current number of executing HTTP requests.
231    /// Metrics impl: Add the value (positive or negative) to a Gauge.
232    HttpExecuting(isize),
233    /// Record the size of HTTP request body in bytes.
234    /// Metrics impl: Update a Histogram with the given byte count.
235    HttpRequestBytes(u64),
236    /// Record the rate of HTTP request data in bytes/second.
237    /// Metrics impl: Update a Histogram with the calculated rate value.
238    HttpRequestBytesRate(f64),
239    /// Record the duration of sending an HTTP request (until first byte received).
240    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
241    HttpRequestDurationSeconds(Duration),
242    /// Record the size of HTTP response body in bytes.
243    /// Metrics impl: Update a Histogram with the given byte count.
244    HttpResponseBytes(u64),
245    /// Record the rate of HTTP response data in bytes/second.
246    /// Metrics impl: Update a Histogram with the calculated rate value.
247    HttpResponseBytesRate(f64),
248    /// Record the duration of receiving an HTTP response (from first byte to last).
249    /// Metrics impl: Update a Histogram with the duration converted to seconds (as f64).
250    HttpResponseDurationSeconds(Duration),
251    /// Increment the counter for HTTP connection errors.
252    /// Metrics impl: Increment a Counter by 1.
253    HttpConnectionErrorsTotal,
254    /// Increment the counter for HTTP status errors (non-2xx responses).
255    /// Metrics impl: Increment a Counter by 1.
256    HttpStatusErrorsTotal,
257}
258
259impl MetricValue {
260    /// Returns the full metric name for this metric value.
261    pub fn name(&self) -> &'static str {
262        match self {
263            MetricValue::OperationBytes(_) => "opendal_operation_bytes",
264            MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
265            MetricValue::OperationEntries(_) => "opendal_operation_entries",
266            MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
267            MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
268            MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
269            MetricValue::OperationExecuting(_) => "opendal_operation_executing",
270            MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
271
272            MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
273            MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
274            MetricValue::HttpExecuting(_) => "opendal_http_executing",
275            MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
276            MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
277            MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
278            MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
279            MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
280            MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
281        }
282    }
283
284    /// Returns the metric name along with unit for this metric value.
285    ///
286    /// # Notes
287    ///
288    /// This API is designed for the metrics impls that unit aware. They will handle the names by themselves like append `_total` for counters.
289    pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
290        match self {
291            MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
292            MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
293            MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
294            MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
295            MetricValue::OperationDurationSeconds(_) => {
296                ("opendal_operation_duration", Some("seconds"))
297            }
298            MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
299            MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
300            MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
301
302            MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
303            MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
304            MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
305            MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
306            MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
307            MetricValue::HttpRequestDurationSeconds(_) => {
308                ("opendal_http_request_duration", Some("seconds"))
309            }
310            MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
311            MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
312            MetricValue::HttpResponseDurationSeconds(_) => {
313                ("opendal_http_response_duration", Some("seconds"))
314            }
315        }
316    }
317
318    /// Returns the help text for this metric value.
319    pub fn help(&self) -> &'static str {
320        match self {
321            MetricValue::OperationBytes(_) => {
322                "Current operation size in bytes, represents the size of data being processed in the current operation"
323            }
324            MetricValue::OperationBytesRate(_) => {
325                "Histogram of data processing rates in bytes per second within individual operations"
326            }
327            MetricValue::OperationEntries(_) => {
328                "Current operation size in entries, represents the entries being processed in the current operation"
329            }
330            MetricValue::OperationEntriesRate(_) => {
331                "Histogram of entries processing rates in entries per second within individual operations"
332            }
333            MetricValue::OperationDurationSeconds(_) => {
334                "Duration of operations in seconds, measured from start to completion"
335            }
336            MetricValue::OperationErrorsTotal => "Total number of failed operations",
337            MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
338            MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
339
340            MetricValue::HttpConnectionErrorsTotal => {
341                "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)"
342            }
343            MetricValue::HttpStatusErrorsTotal => {
344                "Total number of HTTP requests that received error status codes (non-2xx responses)"
345            }
346            MetricValue::HttpExecuting(_) => {
347                "Number of HTTP requests currently in flight from this client"
348            }
349            MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
350            MetricValue::HttpRequestBytesRate(_) => {
351                "Histogram of HTTP request bytes per second rates"
352            }
353            MetricValue::HttpRequestDurationSeconds(_) => {
354                "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte"
355            }
356            MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
357            MetricValue::HttpResponseBytesRate(_) => {
358                "Histogram of HTTP response bytes per second rates"
359            }
360            MetricValue::HttpResponseDurationSeconds(_) => {
361                "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received"
362            }
363        }
364    }
365}
366
367/// The interceptor for metrics.
368///
369/// All metrics related libs should implement this trait to observe opendal's internal operations.
370pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
371    /// Observe the metric value.
372    fn observe(&self, labels: MetricLabels, value: MetricValue) {
373        let _ = (labels, value);
374    }
375}
376
377/// The metrics layer for opendal.
378#[derive(Clone, Debug)]
379pub struct MetricsLayer<I: MetricsIntercept> {
380    interceptor: I,
381}
382
383impl<I: MetricsIntercept> MetricsLayer<I> {
384    /// Create a new metrics layer.
385    pub fn new(interceptor: I) -> Self {
386        Self { interceptor }
387    }
388}
389
390impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
391    type LayeredAccess = MetricsAccessor<A, I>;
392
393    fn layer(&self, inner: A) -> Self::LayeredAccess {
394        let info = inner.info();
395
396        // Update http client with metrics http fetcher.
397        info.update_http_client(|client| {
398            HttpClient::with(MetricsHttpFetcher {
399                inner: client.into_inner(),
400                info: info.clone(),
401                interceptor: self.interceptor.clone(),
402            })
403        });
404
405        MetricsAccessor {
406            inner,
407            info,
408            interceptor: self.interceptor.clone(),
409        }
410    }
411}
412
413/// The metrics http fetcher for opendal.
414pub struct MetricsHttpFetcher<I: MetricsIntercept> {
415    inner: HttpFetcher,
416    info: Arc<AccessorInfo>,
417    interceptor: I,
418}
419
420impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
421    async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
422        let labels = MetricLabels::new(
423            self.info.clone(),
424            req.extensions()
425                .get::<Operation>()
426                .copied()
427                .map(Operation::into_static)
428                .unwrap_or("unknown"),
429        );
430
431        let start = Instant::now();
432        let req_size = req.body().len();
433
434        self.interceptor
435            .observe(labels.clone(), MetricValue::HttpExecuting(1));
436
437        let res = self.inner.fetch(req).await;
438        let req_duration = start.elapsed();
439
440        match res {
441            Err(err) => {
442                self.interceptor
443                    .observe(labels.clone(), MetricValue::HttpExecuting(-1));
444                self.interceptor
445                    .observe(labels, MetricValue::HttpConnectionErrorsTotal);
446                Err(err)
447            }
448            Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
449                self.interceptor
450                    .observe(labels.clone(), MetricValue::HttpExecuting(-1));
451                self.interceptor.observe(
452                    labels.with_status_code(resp.status()),
453                    MetricValue::HttpStatusErrorsTotal,
454                );
455                Ok(resp)
456            }
457            Ok(resp) => {
458                self.interceptor.observe(
459                    labels.clone(),
460                    MetricValue::HttpRequestBytes(req_size as u64),
461                );
462                self.interceptor.observe(
463                    labels.clone(),
464                    MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
465                );
466                self.interceptor.observe(
467                    labels.clone(),
468                    MetricValue::HttpRequestDurationSeconds(req_duration),
469                );
470
471                let (parts, body) = resp.into_parts();
472                let body = body.map_inner(|s| {
473                    Box::new(MetricsStream {
474                        inner: s,
475                        interceptor: self.interceptor.clone(),
476                        labels: labels.clone(),
477                        size: 0,
478                        start: Instant::now(),
479                    })
480                });
481
482                Ok(http::Response::from_parts(parts, body))
483            }
484        }
485    }
486}
487
488pub struct MetricsStream<S, I> {
489    inner: S,
490    interceptor: I,
491
492    labels: MetricLabels,
493    size: u64,
494    start: Instant,
495}
496
497impl<S, I> Stream for MetricsStream<S, I>
498where
499    S: Stream<Item = Result<Buffer>> + Unpin + 'static,
500    I: MetricsIntercept,
501{
502    type Item = Result<Buffer>;
503
504    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
505        match ready!(self.inner.poll_next_unpin(cx)) {
506            Some(Ok(bs)) => {
507                self.size += bs.len() as u64;
508                Poll::Ready(Some(Ok(bs)))
509            }
510            Some(Err(err)) => Poll::Ready(Some(Err(err))),
511            None => {
512                let resp_size = self.size;
513                let resp_duration = self.start.elapsed();
514
515                self.interceptor.observe(
516                    self.labels.clone(),
517                    MetricValue::HttpResponseBytes(resp_size),
518                );
519                self.interceptor.observe(
520                    self.labels.clone(),
521                    MetricValue::HttpResponseBytesRate(
522                        resp_size as f64 / resp_duration.as_secs_f64(),
523                    ),
524                );
525                self.interceptor.observe(
526                    self.labels.clone(),
527                    MetricValue::HttpResponseDurationSeconds(resp_duration),
528                );
529                self.interceptor
530                    .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
531
532                Poll::Ready(None)
533            }
534        }
535    }
536}
537
538/// The metrics accessor for opendal.
539pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
540    inner: A,
541    info: Arc<AccessorInfo>,
542    interceptor: I,
543}
544
545impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
546    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
547        f.debug_struct("MetricsAccessor")
548            .field("inner", &self.inner)
549            .finish_non_exhaustive()
550    }
551}
552
553impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
554    type Inner = A;
555    type Reader = MetricsWrapper<A::Reader, I>;
556    type Writer = MetricsWrapper<A::Writer, I>;
557    type Lister = MetricsWrapper<A::Lister, I>;
558    type Deleter = MetricsWrapper<A::Deleter, I>;
559
560    fn inner(&self) -> &Self::Inner {
561        &self.inner
562    }
563
564    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
565        let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
566
567        let start = Instant::now();
568
569        self.interceptor
570            .observe(labels.clone(), MetricValue::OperationExecuting(1));
571
572        let res = self
573            .inner()
574            .create_dir(path, args)
575            .await
576            .inspect(|_| {
577                self.interceptor.observe(
578                    labels.clone(),
579                    MetricValue::OperationDurationSeconds(start.elapsed()),
580                );
581            })
582            .inspect_err(|err| {
583                self.interceptor.observe(
584                    labels.clone().with_error(err.kind()),
585                    MetricValue::OperationErrorsTotal,
586                );
587            });
588
589        self.interceptor
590            .observe(labels, MetricValue::OperationExecuting(-1));
591        res
592    }
593
594    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
595        let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
596
597        let start = Instant::now();
598
599        self.interceptor
600            .observe(labels.clone(), MetricValue::OperationExecuting(1));
601
602        let (rp, reader) = self
603            .inner
604            .read(path, args)
605            .await
606            .inspect(|_| {
607                self.interceptor.observe(
608                    labels.clone(),
609                    MetricValue::OperationTtfbSeconds(start.elapsed()),
610                );
611            })
612            .inspect_err(|err| {
613                self.interceptor.observe(
614                    labels.clone().with_error(err.kind()),
615                    MetricValue::OperationErrorsTotal,
616                );
617            })?;
618
619        Ok((
620            rp,
621            MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
622        ))
623    }
624
625    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
626        let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
627
628        let start = Instant::now();
629
630        self.interceptor
631            .observe(labels.clone(), MetricValue::OperationExecuting(1));
632
633        let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
634            self.interceptor.observe(
635                labels.clone().with_error(err.kind()),
636                MetricValue::OperationErrorsTotal,
637            );
638        })?;
639
640        Ok((
641            rp,
642            MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
643        ))
644    }
645
646    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
647        let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
648
649        let start = Instant::now();
650
651        self.interceptor
652            .observe(labels.clone(), MetricValue::OperationExecuting(1));
653
654        let res = self
655            .inner()
656            .copy(from, to, args)
657            .await
658            .inspect(|_| {
659                self.interceptor.observe(
660                    labels.clone(),
661                    MetricValue::OperationDurationSeconds(start.elapsed()),
662                );
663            })
664            .inspect_err(|err| {
665                self.interceptor.observe(
666                    labels.clone().with_error(err.kind()),
667                    MetricValue::OperationErrorsTotal,
668                );
669            });
670
671        self.interceptor
672            .observe(labels, MetricValue::OperationExecuting(-1));
673        res
674    }
675
676    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
677        let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
678
679        let start = Instant::now();
680
681        self.interceptor
682            .observe(labels.clone(), MetricValue::OperationExecuting(1));
683
684        let res = self
685            .inner()
686            .rename(from, to, args)
687            .await
688            .inspect(|_| {
689                self.interceptor.observe(
690                    labels.clone(),
691                    MetricValue::OperationDurationSeconds(start.elapsed()),
692                );
693            })
694            .inspect_err(|err| {
695                self.interceptor.observe(
696                    labels.clone().with_error(err.kind()),
697                    MetricValue::OperationErrorsTotal,
698                );
699            });
700
701        self.interceptor
702            .observe(labels, MetricValue::OperationExecuting(-1));
703        res
704    }
705
706    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
707        let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
708
709        let start = Instant::now();
710
711        self.interceptor
712            .observe(labels.clone(), MetricValue::OperationExecuting(1));
713
714        let res = self
715            .inner()
716            .stat(path, args)
717            .await
718            .inspect(|_| {
719                self.interceptor.observe(
720                    labels.clone(),
721                    MetricValue::OperationDurationSeconds(start.elapsed()),
722                );
723            })
724            .inspect_err(|err| {
725                self.interceptor.observe(
726                    labels.clone().with_error(err.kind()),
727                    MetricValue::OperationErrorsTotal,
728                );
729            });
730
731        self.interceptor
732            .observe(labels, MetricValue::OperationExecuting(-1));
733        res
734    }
735
736    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
737        let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
738
739        let start = Instant::now();
740
741        self.interceptor
742            .observe(labels.clone(), MetricValue::OperationExecuting(1));
743
744        let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
745            self.interceptor.observe(
746                labels.clone().with_error(err.kind()),
747                MetricValue::OperationErrorsTotal,
748            );
749        })?;
750
751        Ok((
752            rp,
753            MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
754        ))
755    }
756
757    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
758        let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
759
760        let start = Instant::now();
761
762        self.interceptor
763            .observe(labels.clone(), MetricValue::OperationExecuting(1));
764
765        let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
766            self.interceptor.observe(
767                labels.clone().with_error(err.kind()),
768                MetricValue::OperationErrorsTotal,
769            );
770        })?;
771
772        Ok((
773            rp,
774            MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
775        ))
776    }
777
778    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
779        let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
780
781        let start = Instant::now();
782
783        self.interceptor
784            .observe(labels.clone(), MetricValue::OperationExecuting(1));
785
786        let res = self
787            .inner()
788            .presign(path, args)
789            .await
790            .inspect(|_| {
791                self.interceptor.observe(
792                    labels.clone(),
793                    MetricValue::OperationDurationSeconds(start.elapsed()),
794                );
795            })
796            .inspect_err(|err| {
797                self.interceptor.observe(
798                    labels.clone().with_error(err.kind()),
799                    MetricValue::OperationErrorsTotal,
800                );
801            });
802
803        self.interceptor
804            .observe(labels, MetricValue::OperationExecuting(-1));
805        res
806    }
807}
808
809pub struct MetricsWrapper<R, I: MetricsIntercept> {
810    inner: R,
811    interceptor: I,
812    labels: MetricLabels,
813
814    start: Instant,
815    size: u64,
816}
817
818impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
819    fn drop(&mut self) {
820        let size = self.size;
821        let duration = self.start.elapsed();
822
823        if self.labels.operation == Operation::Read.into_static()
824            || self.labels.operation == Operation::Write.into_static()
825        {
826            self.interceptor
827                .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
828            self.interceptor.observe(
829                self.labels.clone(),
830                MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
831            );
832        } else {
833            self.interceptor.observe(
834                self.labels.clone(),
835                MetricValue::OperationEntries(self.size),
836            );
837            self.interceptor.observe(
838                self.labels.clone(),
839                MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
840            );
841        }
842
843        self.interceptor.observe(
844            self.labels.clone(),
845            MetricValue::OperationDurationSeconds(duration),
846        );
847        self.interceptor
848            .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
849    }
850}
851
852impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
853    fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
854        Self {
855            inner,
856            interceptor,
857            labels,
858            start,
859            size: 0,
860        }
861    }
862}
863
864impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
865    async fn read(&mut self) -> Result<Buffer> {
866        self.inner
867            .read()
868            .await
869            .inspect(|bs| {
870                self.size += bs.len() as u64;
871            })
872            .inspect_err(|err| {
873                self.interceptor.observe(
874                    self.labels.clone().with_error(err.kind()),
875                    MetricValue::OperationErrorsTotal,
876                );
877            })
878    }
879}
880
881impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
882    async fn write(&mut self, bs: Buffer) -> Result<()> {
883        let size = bs.len();
884
885        self.inner
886            .write(bs)
887            .await
888            .inspect(|_| {
889                self.size += size as u64;
890            })
891            .inspect_err(|err| {
892                self.interceptor.observe(
893                    self.labels.clone().with_error(err.kind()),
894                    MetricValue::OperationErrorsTotal,
895                );
896            })
897    }
898
899    async fn close(&mut self) -> Result<Metadata> {
900        self.inner.close().await.inspect_err(|err| {
901            self.interceptor.observe(
902                self.labels.clone().with_error(err.kind()),
903                MetricValue::OperationErrorsTotal,
904            );
905        })
906    }
907
908    async fn abort(&mut self) -> Result<()> {
909        self.inner.abort().await.inspect_err(|err| {
910            self.interceptor.observe(
911                self.labels.clone().with_error(err.kind()),
912                MetricValue::OperationErrorsTotal,
913            );
914        })
915    }
916}
917
918impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
919    async fn next(&mut self) -> Result<Option<oio::Entry>> {
920        self.inner
921            .next()
922            .await
923            .inspect(|_| {
924                self.size += 1;
925            })
926            .inspect_err(|err| {
927                self.interceptor.observe(
928                    self.labels.clone().with_error(err.kind()),
929                    MetricValue::OperationErrorsTotal,
930                );
931            })
932    }
933}
934
935impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
936    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
937        self.inner
938            .delete(path, args)
939            .await
940            .inspect(|_| {
941                self.size += 1;
942            })
943            .inspect_err(|err| {
944                self.interceptor.observe(
945                    self.labels.clone().with_error(err.kind()),
946                    MetricValue::OperationErrorsTotal,
947                );
948            })
949    }
950
951    async fn close(&mut self) -> Result<()> {
952        self.inner.close().await.inspect_err(|err| {
953            self.interceptor.observe(
954                self.labels.clone().with_error(err.kind()),
955                MetricValue::OperationErrorsTotal,
956            );
957        })
958    }
959}