opendal/layers/
prometheus.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::time::Duration;
19
20use prometheus::core::AtomicI64;
21use prometheus::core::AtomicU64;
22use prometheus::core::GenericCounterVec;
23use prometheus::core::GenericGaugeVec;
24use prometheus::register_histogram_vec_with_registry;
25use prometheus::register_int_counter_vec_with_registry;
26use prometheus::register_int_gauge_vec_with_registry;
27use prometheus::HistogramVec;
28use prometheus::Registry;
29
30use crate::layers::observe;
31use crate::raw::Access;
32use crate::raw::*;
33use crate::*;
34
35/// Add [prometheus](https://docs.rs/prometheus) for every operation.
36///
37/// # Prometheus Metrics
38///
39/// We provide several metrics, please see the documentation of [`observe`] module.
40/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
41///
42/// # Examples
43///
44/// ```no_run
45/// # use log::debug;
46/// # use log::info;
47/// # use opendal::layers::PrometheusLayer;
48/// # use opendal::services;
49/// # use opendal::Operator;
50/// # use opendal::Result;
51/// # use prometheus::Encoder;
52///
53/// # #[tokio::main]
54/// # async fn main() -> Result<()> {
55/// let registry = prometheus::default_registry();
56///
57/// let op = Operator::new(services::Memory::default())?
58///     .layer(
59///         PrometheusLayer::builder()
60///             .register(registry)
61///             .expect("register metrics successfully"),
62///     )
63///     .finish();
64/// debug!("operator: {op:?}");
65///
66/// // Write data into object test.
67/// op.write("test", "Hello, World!").await?;
68/// // Read data from object.
69/// let bs = op.read("test").await?;
70/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
71///
72/// // Get object metadata.
73/// let meta = op.stat("test").await?;
74/// info!("meta: {:?}", meta);
75///
76/// // Export prometheus metrics.
77/// let mut buffer = Vec::<u8>::new();
78/// let encoder = prometheus::TextEncoder::new();
79/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
80/// println!("## Prometheus Metrics");
81/// println!("{}", String::from_utf8(buffer.clone()).unwrap());
82///
83/// Ok(())
84/// # }
85/// ```
86#[derive(Clone, Debug)]
87pub struct PrometheusLayer {
88    interceptor: PrometheusInterceptor,
89}
90
91impl PrometheusLayer {
92    /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics.
93    ///
94    /// # Example
95    ///
96    /// ```no_run
97    /// # use log::debug;
98    /// # use opendal::layers::PrometheusLayer;
99    /// # use opendal::services;
100    /// # use opendal::Operator;
101    /// # use opendal::Result;
102    /// #
103    /// # #[tokio::main]
104    /// # async fn main() -> Result<()> {
105    /// // Pick a builder and configure it.
106    /// let builder = services::Memory::default();
107    /// let registry = prometheus::default_registry();
108    ///
109    /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap();
110    /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap();
111    /// let op = Operator::new(builder)?
112    ///     .layer(
113    ///         PrometheusLayer::builder()
114    ///             .duration_seconds_buckets(duration_seconds_buckets)
115    ///             .bytes_buckets(bytes_buckets)
116    ///             .register(registry)
117    ///             .expect("register metrics successfully"),
118    ///     )
119    ///     .finish();
120    /// debug!("operator: {op:?}");
121    ///
122    /// Ok(())
123    /// # }
124    /// ```
125    pub fn builder() -> PrometheusLayerBuilder {
126        PrometheusLayerBuilder::default()
127    }
128}
129
130impl<A: Access> Layer<A> for PrometheusLayer {
131    type LayeredAccess = observe::MetricsAccessor<A, PrometheusInterceptor>;
132
133    fn layer(&self, inner: A) -> Self::LayeredAccess {
134        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
135    }
136}
137
138/// [`PrometheusLayerBuilder`] is a config builder to build a [`PrometheusLayer`].
139pub struct PrometheusLayerBuilder {
140    bytes_buckets: Vec<f64>,
141    bytes_rate_buckets: Vec<f64>,
142    entries_buckets: Vec<f64>,
143    entries_rate_buckets: Vec<f64>,
144    duration_seconds_buckets: Vec<f64>,
145    ttfb_buckets: Vec<f64>,
146}
147
148impl Default for PrometheusLayerBuilder {
149    fn default() -> Self {
150        Self {
151            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
152            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
153            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
154            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
155            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
156            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
157        }
158    }
159}
160
161impl PrometheusLayerBuilder {
162    /// Set buckets for bytes related histogram like `operation_bytes`.
163    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
164        if !buckets.is_empty() {
165            self.bytes_buckets = buckets;
166        }
167        self
168    }
169
170    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
171    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
172        if !buckets.is_empty() {
173            self.bytes_rate_buckets = buckets;
174        }
175        self
176    }
177
178    /// Set buckets for entries related histogram like `operation_entries`.
179    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
180        if !buckets.is_empty() {
181            self.entries_buckets = buckets;
182        }
183        self
184    }
185
186    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
187    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
188        if !buckets.is_empty() {
189            self.entries_rate_buckets = buckets;
190        }
191        self
192    }
193
194    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
195    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
196        if !buckets.is_empty() {
197            self.duration_seconds_buckets = buckets;
198        }
199        self
200    }
201
202    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
203    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
204        if !buckets.is_empty() {
205            self.ttfb_buckets = buckets;
206        }
207        self
208    }
209
210    /// Register the metrics into the given registry and return a [`PrometheusLayer`].
211    ///
212    /// # Example
213    ///
214    /// ```no_run
215    /// # use log::debug;
216    /// # use opendal::layers::PrometheusLayer;
217    /// # use opendal::services;
218    /// # use opendal::Operator;
219    /// # use opendal::Result;
220    /// #
221    /// # #[tokio::main]
222    /// # async fn main() -> Result<()> {
223    /// // Pick a builder and configure it.
224    /// let builder = services::Memory::default();
225    /// let registry = prometheus::default_registry();
226    ///
227    /// let op = Operator::new(builder)?
228    ///     .layer(
229    ///         PrometheusLayer::builder()
230    ///             .register(registry)
231    ///             .expect("register metrics successfully"),
232    ///     )
233    ///     .finish();
234    /// debug!("operator: {op:?}");
235    ///
236    /// Ok(())
237    /// # }
238    /// ```
239    pub fn register(self, registry: &Registry) -> Result<PrometheusLayer> {
240        let labels = OperationLabels::names();
241        let operation_bytes = {
242            let metric = observe::MetricValue::OperationBytes(0);
243            register_histogram_vec_with_registry!(
244                metric.name(),
245                metric.help(),
246                labels.as_ref(),
247                self.bytes_buckets.clone(),
248                registry
249            )
250            .map_err(parse_prometheus_error)?
251        };
252        let operation_bytes_rate = {
253            let metric = observe::MetricValue::OperationBytesRate(0.0);
254            register_histogram_vec_with_registry!(
255                metric.name(),
256                metric.help(),
257                labels.as_ref(),
258                self.bytes_rate_buckets.clone(),
259                registry
260            )
261            .map_err(parse_prometheus_error)?
262        };
263        let operation_entries = {
264            let metric = observe::MetricValue::OperationEntries(0);
265            register_histogram_vec_with_registry!(
266                metric.name(),
267                metric.help(),
268                labels.as_ref(),
269                self.entries_buckets,
270                registry
271            )
272            .map_err(parse_prometheus_error)?
273        };
274        let operation_entries_rate = {
275            let metric = observe::MetricValue::OperationEntriesRate(0.0);
276            register_histogram_vec_with_registry!(
277                metric.name(),
278                metric.help(),
279                labels.as_ref(),
280                self.entries_rate_buckets,
281                registry
282            )
283            .map_err(parse_prometheus_error)?
284        };
285        let operation_duration_seconds = {
286            let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
287            register_histogram_vec_with_registry!(
288                metric.name(),
289                metric.help(),
290                labels.as_ref(),
291                self.duration_seconds_buckets.clone(),
292                registry
293            )
294            .map_err(parse_prometheus_error)?
295        };
296        let operation_executing = {
297            let metric = observe::MetricValue::OperationExecuting(0);
298            register_int_gauge_vec_with_registry!(
299                metric.name(),
300                metric.help(),
301                labels.as_ref(),
302                registry
303            )
304            .map_err(parse_prometheus_error)?
305        };
306        let operation_ttfb_seconds = {
307            let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
308            register_histogram_vec_with_registry!(
309                metric.name(),
310                metric.help(),
311                labels.as_ref(),
312                self.ttfb_buckets.clone(),
313                registry
314            )
315            .map_err(parse_prometheus_error)?
316        };
317
318        let labels_with_error = OperationLabels::names().with_error();
319        let operation_errors_total = {
320            let metric = observe::MetricValue::OperationErrorsTotal;
321            register_int_counter_vec_with_registry!(
322                metric.name(),
323                metric.help(),
324                labels_with_error.as_ref(),
325                registry
326            )
327            .map_err(parse_prometheus_error)?
328        };
329
330        let http_executing = {
331            let metric = observe::MetricValue::HttpExecuting(0);
332            register_int_gauge_vec_with_registry!(
333                metric.name(),
334                metric.help(),
335                labels.as_ref(),
336                registry
337            )
338            .map_err(parse_prometheus_error)?
339        };
340        let http_request_bytes = {
341            let metric = observe::MetricValue::HttpRequestBytes(0);
342            register_histogram_vec_with_registry!(
343                metric.name(),
344                metric.help(),
345                labels.as_ref(),
346                self.bytes_buckets.clone(),
347                registry
348            )
349            .map_err(parse_prometheus_error)?
350        };
351        let http_request_bytes_rate = {
352            let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
353            register_histogram_vec_with_registry!(
354                metric.name(),
355                metric.help(),
356                labels.as_ref(),
357                self.bytes_rate_buckets.clone(),
358                registry
359            )
360            .map_err(parse_prometheus_error)?
361        };
362        let http_request_duration_seconds = {
363            let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
364            register_histogram_vec_with_registry!(
365                metric.name(),
366                metric.help(),
367                labels.as_ref(),
368                self.duration_seconds_buckets.clone(),
369                registry
370            )
371            .map_err(parse_prometheus_error)?
372        };
373        let http_response_bytes = {
374            let metric = observe::MetricValue::HttpResponseBytes(0);
375            register_histogram_vec_with_registry!(
376                metric.name(),
377                metric.help(),
378                labels.as_ref(),
379                self.bytes_buckets,
380                registry
381            )
382            .map_err(parse_prometheus_error)?
383        };
384        let http_response_bytes_rate = {
385            let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
386            register_histogram_vec_with_registry!(
387                metric.name(),
388                metric.help(),
389                labels.as_ref(),
390                self.bytes_rate_buckets,
391                registry
392            )
393            .map_err(parse_prometheus_error)?
394        };
395        let http_response_duration_seconds = {
396            let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
397            register_histogram_vec_with_registry!(
398                metric.name(),
399                metric.help(),
400                labels.as_ref(),
401                self.duration_seconds_buckets,
402                registry
403            )
404            .map_err(parse_prometheus_error)?
405        };
406        let http_connection_errors_total = {
407            let metric = observe::MetricValue::HttpConnectionErrorsTotal;
408            register_int_counter_vec_with_registry!(
409                metric.name(),
410                metric.help(),
411                labels.as_ref(),
412                registry
413            )
414            .map_err(parse_prometheus_error)?
415        };
416
417        let labels_with_status_code = OperationLabels::names().with_status_code();
418        let http_status_errors_total = {
419            let metric = observe::MetricValue::HttpStatusErrorsTotal;
420            register_int_counter_vec_with_registry!(
421                metric.name(),
422                metric.help(),
423                labels_with_status_code.as_ref(),
424                registry
425            )
426            .map_err(parse_prometheus_error)?
427        };
428
429        Ok(PrometheusLayer {
430            interceptor: PrometheusInterceptor {
431                operation_bytes,
432                operation_bytes_rate,
433                operation_entries,
434                operation_entries_rate,
435                operation_duration_seconds,
436                operation_errors_total,
437                operation_executing,
438                operation_ttfb_seconds,
439
440                http_executing,
441                http_request_bytes,
442                http_request_bytes_rate,
443                http_request_duration_seconds,
444                http_response_bytes,
445                http_response_bytes_rate,
446                http_response_duration_seconds,
447                http_connection_errors_total,
448                http_status_errors_total,
449            },
450        })
451    }
452
453    /// Register the metrics into the default registry and return a [`PrometheusLayer`].
454    ///
455    /// # Example
456    ///
457    /// ```no_run
458    /// # use log::debug;
459    /// # use opendal::layers::PrometheusLayer;
460    /// # use opendal::services;
461    /// # use opendal::Operator;
462    /// # use opendal::Result;
463    /// #
464    /// # #[tokio::main]
465    /// # async fn main() -> Result<()> {
466    /// // Pick a builder and configure it.
467    /// let builder = services::Memory::default();
468    ///
469    /// let op = Operator::new(builder)?
470    ///     .layer(
471    ///         PrometheusLayer::builder()
472    ///             .register_default()
473    ///             .expect("register metrics successfully"),
474    ///     )
475    ///     .finish();
476    /// debug!("operator: {op:?}");
477    ///
478    /// Ok(())
479    /// # }
480    /// ```
481    pub fn register_default(self) -> Result<PrometheusLayer> {
482        let registry = prometheus::default_registry();
483        self.register(registry)
484    }
485}
486
487/// Convert the [`prometheus::Error`] to [`Error`].
488fn parse_prometheus_error(err: prometheus::Error) -> Error {
489    Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err)
490}
491
492#[derive(Clone, Debug)]
493pub struct PrometheusInterceptor {
494    operation_bytes: HistogramVec,
495    operation_bytes_rate: HistogramVec,
496    operation_entries: HistogramVec,
497    operation_entries_rate: HistogramVec,
498    operation_duration_seconds: HistogramVec,
499    operation_errors_total: GenericCounterVec<AtomicU64>,
500    operation_executing: GenericGaugeVec<AtomicI64>,
501    operation_ttfb_seconds: HistogramVec,
502
503    http_executing: GenericGaugeVec<AtomicI64>,
504    http_request_bytes: HistogramVec,
505    http_request_bytes_rate: HistogramVec,
506    http_request_duration_seconds: HistogramVec,
507    http_response_bytes: HistogramVec,
508    http_response_bytes_rate: HistogramVec,
509    http_response_duration_seconds: HistogramVec,
510    http_connection_errors_total: GenericCounterVec<AtomicU64>,
511    http_status_errors_total: GenericCounterVec<AtomicU64>,
512}
513
514impl observe::MetricsIntercept for PrometheusInterceptor {
515    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
516        let labels = OperationLabels(labels);
517        match value {
518            observe::MetricValue::OperationBytes(v) => self
519                .operation_bytes
520                .with_label_values(&labels.values())
521                .observe(v as f64),
522            observe::MetricValue::OperationBytesRate(v) => self
523                .operation_bytes_rate
524                .with_label_values(&labels.values())
525                .observe(v),
526            observe::MetricValue::OperationEntries(v) => self
527                .operation_entries
528                .with_label_values(&labels.values())
529                .observe(v as f64),
530            observe::MetricValue::OperationEntriesRate(v) => self
531                .operation_entries_rate
532                .with_label_values(&labels.values())
533                .observe(v),
534            observe::MetricValue::OperationDurationSeconds(v) => self
535                .operation_duration_seconds
536                .with_label_values(&labels.values())
537                .observe(v.as_secs_f64()),
538            observe::MetricValue::OperationErrorsTotal => self
539                .operation_errors_total
540                .with_label_values(&labels.values())
541                .inc(),
542            observe::MetricValue::OperationExecuting(v) => self
543                .operation_executing
544                .with_label_values(&labels.values())
545                .add(v as i64),
546            observe::MetricValue::OperationTtfbSeconds(v) => self
547                .operation_ttfb_seconds
548                .with_label_values(&labels.values())
549                .observe(v.as_secs_f64()),
550
551            observe::MetricValue::HttpExecuting(v) => self
552                .http_executing
553                .with_label_values(&labels.values())
554                .add(v as i64),
555            observe::MetricValue::HttpRequestBytes(v) => self
556                .http_request_bytes
557                .with_label_values(&labels.values())
558                .observe(v as f64),
559            observe::MetricValue::HttpRequestBytesRate(v) => self
560                .http_request_bytes_rate
561                .with_label_values(&labels.values())
562                .observe(v),
563            observe::MetricValue::HttpRequestDurationSeconds(v) => self
564                .http_request_duration_seconds
565                .with_label_values(&labels.values())
566                .observe(v.as_secs_f64()),
567            observe::MetricValue::HttpResponseBytes(v) => self
568                .http_response_bytes
569                .with_label_values(&labels.values())
570                .observe(v as f64),
571            observe::MetricValue::HttpResponseBytesRate(v) => self
572                .http_response_bytes_rate
573                .with_label_values(&labels.values())
574                .observe(v),
575            observe::MetricValue::HttpResponseDurationSeconds(v) => self
576                .http_response_duration_seconds
577                .with_label_values(&labels.values())
578                .observe(v.as_secs_f64()),
579            observe::MetricValue::HttpConnectionErrorsTotal => self
580                .http_connection_errors_total
581                .with_label_values(&labels.values())
582                .inc(),
583            observe::MetricValue::HttpStatusErrorsTotal => self
584                .http_status_errors_total
585                .with_label_values(&labels.values())
586                .inc(),
587        }
588    }
589}
590
591struct OperationLabelNames(Vec<&'static str>);
592
593impl AsRef<[&'static str]> for OperationLabelNames {
594    fn as_ref(&self) -> &[&'static str] {
595        &self.0
596    }
597}
598
599impl OperationLabelNames {
600    fn with_error(mut self) -> Self {
601        self.0.push(observe::LABEL_ERROR);
602        self
603    }
604
605    fn with_status_code(mut self) -> Self {
606        self.0.push(observe::LABEL_STATUS_CODE);
607        self
608    }
609}
610
611#[derive(Clone, Debug, PartialEq, Eq, Hash)]
612struct OperationLabels(observe::MetricLabels);
613
614impl OperationLabels {
615    fn names() -> OperationLabelNames {
616        OperationLabelNames(vec![
617            observe::LABEL_SCHEME,
618            observe::LABEL_NAMESPACE,
619            observe::LABEL_ROOT,
620            observe::LABEL_OPERATION,
621        ])
622    }
623
624    fn values(&self) -> Vec<&str> {
625        let mut labels = Vec::with_capacity(6);
626
627        labels.extend([
628            self.0.scheme.into_static(),
629            self.0.namespace.as_ref(),
630            self.0.root.as_ref(),
631            self.0.operation,
632        ]);
633
634        if let Some(error) = self.0.error {
635            labels.push(error.into_static());
636        }
637
638        if let Some(status_code) = &self.0.status_code {
639            labels.push(status_code.as_str());
640        }
641
642        labels
643    }
644}