opendal_core/layers/
prometheus.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use prometheus::HistogramVec;
19use prometheus::Registry;
20use prometheus::core::AtomicI64;
21use prometheus::core::AtomicU64;
22use prometheus::core::GenericCounterVec;
23use prometheus::core::GenericGaugeVec;
24use prometheus::register_histogram_vec_with_registry;
25use prometheus::register_int_counter_vec_with_registry;
26use prometheus::register_int_gauge_vec_with_registry;
27
28use crate::layers::observe;
29use crate::raw::Access;
30use crate::raw::*;
31use crate::*;
32
33/// Add [prometheus](https://docs.rs/prometheus) for every operation.
34///
35/// # Prometheus Metrics
36///
37/// We provide several metrics, please see the documentation of [`observe`] module.
38/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
39///
40/// # Examples
41///
42/// ## Basic Usage
43///
44/// ```no_run
45/// # use log::info;
46/// # use opendal_core::layers::PrometheusLayer;
47/// # use opendal_core::services;
48/// # use opendal_core::Operator;
49/// # use opendal_core::Result;
50/// # use prometheus::Encoder;
51///
52/// # #[tokio::main]
53/// # async fn main() -> Result<()> {
54/// let registry = prometheus::default_registry();
55///
56/// let op = Operator::new(services::Memory::default())?
57///     .layer(
58///         PrometheusLayer::builder()
59///             .register(registry)
60///             .expect("register metrics successfully"),
61///     )
62///     .finish();
63///
64/// // Write data into object test.
65/// op.write("test", "Hello, World!").await?;
66/// // Read data from the object.
67/// let bs = op.read("test").await?;
68/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
69///
70/// // Get object metadata.
71/// let meta = op.stat("test").await?;
72/// info!("meta: {:?}", meta);
73///
74/// // Export prometheus metrics.
75/// let mut buffer = Vec::<u8>::new();
76/// let encoder = prometheus::TextEncoder::new();
77/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
78/// println!("## Prometheus Metrics");
79/// println!("{}", String::from_utf8(buffer.clone()).unwrap());
80/// # Ok(())
81/// # }
82/// ```
83///
84/// ## Global Instance
85///
86/// `PrometheusLayer` needs to be registered before instantiation.
87///
88/// If there are multiple operators in an application that need the `PrometheusLayer`, we could
89/// instantiate it once and pass it to multiple operators. But we cannot directly call
90/// `.layer(PrometheusLayer::builder().register(&registry)?)` for different services, because
91/// registering the same metrics multiple times to the same registry will cause register errors.
92/// Therefore, we can provide a global instance for the `PrometheusLayer`.
93///
94/// ```no_run
95/// # use std::sync::OnceLock;
96/// # use log::info;
97/// # use opendal_core::layers::PrometheusLayer;
98/// # use opendal_core::services;
99/// # use opendal_core::Operator;
100/// # use opendal_core::Result;
101/// # use prometheus::Encoder;
102///
103/// fn global_prometheus_layer() -> &'static PrometheusLayer {
104///     static GLOBAL: OnceLock<PrometheusLayer> = OnceLock::new();
105///     GLOBAL.get_or_init(|| {
106///         PrometheusLayer::builder()
107///             .register_default()
108///             .expect("Failed to register with the global registry")
109///     })
110/// }
111///
112/// # #[tokio::main]
113/// # async fn main() -> Result<()> {
114/// let op = Operator::new(services::Memory::default())?
115///     .layer(global_prometheus_layer().clone())
116///     .finish();
117///
118/// // Write data into object test.
119/// op.write("test", "Hello, World!").await?;
120///
121/// // Read data from the object.
122/// let bs = op.read("test").await?;
123/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
124///
125/// // Get object metadata.
126/// let meta = op.stat("test").await?;
127/// info!("meta: {:?}", meta);
128///
129/// // Export prometheus metrics.
130/// let mut buffer = Vec::<u8>::new();
131/// let encoder = prometheus::TextEncoder::new();
132/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
133/// println!("## Prometheus Metrics");
134/// println!("{}", String::from_utf8(buffer.clone()).unwrap());
135/// # Ok(())
136/// # }
137/// ```
138#[derive(Clone, Debug)]
139pub struct PrometheusLayer {
140    interceptor: PrometheusInterceptor,
141}
142
143impl PrometheusLayer {
144    /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics.
145    ///
146    /// # Example
147    ///
148    /// ```no_run
149    /// # use opendal_core::layers::PrometheusLayer;
150    /// # use opendal_core::services;
151    /// # use opendal_core::Operator;
152    /// # use opendal_core::Result;
153    /// #
154    /// # #[tokio::main]
155    /// # async fn main() -> Result<()> {
156    /// // Pick a builder and configure it.
157    /// let builder = services::Memory::default();
158    /// let registry = prometheus::default_registry();
159    ///
160    /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap();
161    /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap();
162    /// let _ = Operator::new(builder)?
163    ///     .layer(
164    ///         PrometheusLayer::builder()
165    ///             .duration_seconds_buckets(duration_seconds_buckets)
166    ///             .bytes_buckets(bytes_buckets)
167    ///             .register(registry)
168    ///             .expect("register metrics successfully"),
169    ///     )
170    ///     .finish();
171    /// # Ok(())
172    /// # }
173    /// ```
174    pub fn builder() -> PrometheusLayerBuilder {
175        PrometheusLayerBuilder::default()
176    }
177}
178
179impl<A: Access> Layer<A> for PrometheusLayer {
180    type LayeredAccess = observe::MetricsAccessor<A, PrometheusInterceptor>;
181
182    fn layer(&self, inner: A) -> Self::LayeredAccess {
183        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
184    }
185}
186
187/// [`PrometheusLayerBuilder`] is a config builder to build a [`PrometheusLayer`].
188pub struct PrometheusLayerBuilder {
189    bytes_buckets: Vec<f64>,
190    bytes_rate_buckets: Vec<f64>,
191    entries_buckets: Vec<f64>,
192    entries_rate_buckets: Vec<f64>,
193    duration_seconds_buckets: Vec<f64>,
194    ttfb_buckets: Vec<f64>,
195}
196
197impl Default for PrometheusLayerBuilder {
198    fn default() -> Self {
199        Self {
200            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
201            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
202            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
203            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
204            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
205            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
206        }
207    }
208}
209
210impl PrometheusLayerBuilder {
211    /// Set buckets for bytes related histogram like `operation_bytes`.
212    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
213        if !buckets.is_empty() {
214            self.bytes_buckets = buckets;
215        }
216        self
217    }
218
219    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
220    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
221        if !buckets.is_empty() {
222            self.bytes_rate_buckets = buckets;
223        }
224        self
225    }
226
227    /// Set buckets for entries related histogram like `operation_entries`.
228    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
229        if !buckets.is_empty() {
230            self.entries_buckets = buckets;
231        }
232        self
233    }
234
235    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
236    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
237        if !buckets.is_empty() {
238            self.entries_rate_buckets = buckets;
239        }
240        self
241    }
242
243    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
244    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
245        if !buckets.is_empty() {
246            self.duration_seconds_buckets = buckets;
247        }
248        self
249    }
250
251    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
252    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
253        if !buckets.is_empty() {
254            self.ttfb_buckets = buckets;
255        }
256        self
257    }
258
259    /// Register the metrics into the given registry and return a [`PrometheusLayer`].
260    ///
261    /// # Example
262    ///
263    /// ```no_run
264    /// # use opendal_core::layers::PrometheusLayer;
265    /// # use opendal_core::services;
266    /// # use opendal_core::Operator;
267    /// # use opendal_core::Result;
268    /// #
269    /// # #[tokio::main]
270    /// # async fn main() -> Result<()> {
271    /// // Pick a builder and configure it.
272    /// let builder = services::Memory::default();
273    /// let _ = Operator::new(builder)?
274    ///     .layer(
275    ///         PrometheusLayer::builder()
276    ///             .register(prometheus::default_registry())
277    ///             .expect("register metrics successfully"),
278    ///     )
279    ///     .finish();
280    /// # Ok(())
281    /// # }
282    /// ```
283    pub fn register(self, registry: &Registry) -> Result<PrometheusLayer> {
284        let labels = OperationLabels::names();
285        let operation_bytes = {
286            let metric = observe::MetricValue::OperationBytes(0);
287            register_histogram_vec_with_registry!(
288                metric.name(),
289                metric.help(),
290                labels.as_ref(),
291                self.bytes_buckets.clone(),
292                registry
293            )
294            .map_err(parse_prometheus_error)?
295        };
296        let operation_bytes_rate = {
297            let metric = observe::MetricValue::OperationBytesRate(0.0);
298            register_histogram_vec_with_registry!(
299                metric.name(),
300                metric.help(),
301                labels.as_ref(),
302                self.bytes_rate_buckets.clone(),
303                registry
304            )
305            .map_err(parse_prometheus_error)?
306        };
307        let operation_entries = {
308            let metric = observe::MetricValue::OperationEntries(0);
309            register_histogram_vec_with_registry!(
310                metric.name(),
311                metric.help(),
312                labels.as_ref(),
313                self.entries_buckets,
314                registry
315            )
316            .map_err(parse_prometheus_error)?
317        };
318        let operation_entries_rate = {
319            let metric = observe::MetricValue::OperationEntriesRate(0.0);
320            register_histogram_vec_with_registry!(
321                metric.name(),
322                metric.help(),
323                labels.as_ref(),
324                self.entries_rate_buckets,
325                registry
326            )
327            .map_err(parse_prometheus_error)?
328        };
329        let operation_duration_seconds = {
330            let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
331            register_histogram_vec_with_registry!(
332                metric.name(),
333                metric.help(),
334                labels.as_ref(),
335                self.duration_seconds_buckets.clone(),
336                registry
337            )
338            .map_err(parse_prometheus_error)?
339        };
340        let operation_executing = {
341            let metric = observe::MetricValue::OperationExecuting(0);
342            register_int_gauge_vec_with_registry!(
343                metric.name(),
344                metric.help(),
345                labels.as_ref(),
346                registry
347            )
348            .map_err(parse_prometheus_error)?
349        };
350        let operation_ttfb_seconds = {
351            let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
352            register_histogram_vec_with_registry!(
353                metric.name(),
354                metric.help(),
355                labels.as_ref(),
356                self.ttfb_buckets.clone(),
357                registry
358            )
359            .map_err(parse_prometheus_error)?
360        };
361
362        let labels_with_error = OperationLabels::names().with_error();
363        let operation_errors_total = {
364            let metric = observe::MetricValue::OperationErrorsTotal;
365            register_int_counter_vec_with_registry!(
366                metric.name(),
367                metric.help(),
368                labels_with_error.as_ref(),
369                registry
370            )
371            .map_err(parse_prometheus_error)?
372        };
373
374        let http_executing = {
375            let metric = observe::MetricValue::HttpExecuting(0);
376            register_int_gauge_vec_with_registry!(
377                metric.name(),
378                metric.help(),
379                labels.as_ref(),
380                registry
381            )
382            .map_err(parse_prometheus_error)?
383        };
384        let http_request_bytes = {
385            let metric = observe::MetricValue::HttpRequestBytes(0);
386            register_histogram_vec_with_registry!(
387                metric.name(),
388                metric.help(),
389                labels.as_ref(),
390                self.bytes_buckets.clone(),
391                registry
392            )
393            .map_err(parse_prometheus_error)?
394        };
395        let http_request_bytes_rate = {
396            let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
397            register_histogram_vec_with_registry!(
398                metric.name(),
399                metric.help(),
400                labels.as_ref(),
401                self.bytes_rate_buckets.clone(),
402                registry
403            )
404            .map_err(parse_prometheus_error)?
405        };
406        let http_request_duration_seconds = {
407            let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
408            register_histogram_vec_with_registry!(
409                metric.name(),
410                metric.help(),
411                labels.as_ref(),
412                self.duration_seconds_buckets.clone(),
413                registry
414            )
415            .map_err(parse_prometheus_error)?
416        };
417        let http_response_bytes = {
418            let metric = observe::MetricValue::HttpResponseBytes(0);
419            register_histogram_vec_with_registry!(
420                metric.name(),
421                metric.help(),
422                labels.as_ref(),
423                self.bytes_buckets,
424                registry
425            )
426            .map_err(parse_prometheus_error)?
427        };
428        let http_response_bytes_rate = {
429            let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
430            register_histogram_vec_with_registry!(
431                metric.name(),
432                metric.help(),
433                labels.as_ref(),
434                self.bytes_rate_buckets,
435                registry
436            )
437            .map_err(parse_prometheus_error)?
438        };
439        let http_response_duration_seconds = {
440            let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
441            register_histogram_vec_with_registry!(
442                metric.name(),
443                metric.help(),
444                labels.as_ref(),
445                self.duration_seconds_buckets,
446                registry
447            )
448            .map_err(parse_prometheus_error)?
449        };
450        let http_connection_errors_total = {
451            let metric = observe::MetricValue::HttpConnectionErrorsTotal;
452            register_int_counter_vec_with_registry!(
453                metric.name(),
454                metric.help(),
455                labels.as_ref(),
456                registry
457            )
458            .map_err(parse_prometheus_error)?
459        };
460
461        let labels_with_status_code = OperationLabels::names().with_status_code();
462        let http_status_errors_total = {
463            let metric = observe::MetricValue::HttpStatusErrorsTotal;
464            register_int_counter_vec_with_registry!(
465                metric.name(),
466                metric.help(),
467                labels_with_status_code.as_ref(),
468                registry
469            )
470            .map_err(parse_prometheus_error)?
471        };
472
473        Ok(PrometheusLayer {
474            interceptor: PrometheusInterceptor {
475                operation_bytes,
476                operation_bytes_rate,
477                operation_entries,
478                operation_entries_rate,
479                operation_duration_seconds,
480                operation_errors_total,
481                operation_executing,
482                operation_ttfb_seconds,
483
484                http_executing,
485                http_request_bytes,
486                http_request_bytes_rate,
487                http_request_duration_seconds,
488                http_response_bytes,
489                http_response_bytes_rate,
490                http_response_duration_seconds,
491                http_connection_errors_total,
492                http_status_errors_total,
493            },
494        })
495    }
496
497    /// Register the metrics into the default registry and return a [`PrometheusLayer`].
498    ///
499    /// # Example
500    ///
501    /// ```no_run
502    /// # use opendal_core::layers::PrometheusLayer;
503    /// # use opendal_core::services;
504    /// # use opendal_core::Operator;
505    /// # use opendal_core::Result;
506    /// #
507    /// # #[tokio::main]
508    /// # async fn main() -> Result<()> {
509    /// // Pick a builder and configure it.
510    /// let builder = services::Memory::default();
511    /// let _ = Operator::new(builder)?
512    ///     .layer(
513    ///         PrometheusLayer::builder()
514    ///             .register_default()
515    ///             .expect("register metrics successfully"),
516    ///     )
517    ///     .finish();
518    /// # Ok(())
519    /// # }
520    /// ```
521    pub fn register_default(self) -> Result<PrometheusLayer> {
522        let registry = prometheus::default_registry();
523        self.register(registry)
524    }
525}
526
527/// Convert the [`prometheus::Error`] to [`Error`].
528fn parse_prometheus_error(err: prometheus::Error) -> Error {
529    Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err)
530}
531
532#[derive(Clone, Debug)]
533pub struct PrometheusInterceptor {
534    operation_bytes: HistogramVec,
535    operation_bytes_rate: HistogramVec,
536    operation_entries: HistogramVec,
537    operation_entries_rate: HistogramVec,
538    operation_duration_seconds: HistogramVec,
539    operation_errors_total: GenericCounterVec<AtomicU64>,
540    operation_executing: GenericGaugeVec<AtomicI64>,
541    operation_ttfb_seconds: HistogramVec,
542
543    http_executing: GenericGaugeVec<AtomicI64>,
544    http_request_bytes: HistogramVec,
545    http_request_bytes_rate: HistogramVec,
546    http_request_duration_seconds: HistogramVec,
547    http_response_bytes: HistogramVec,
548    http_response_bytes_rate: HistogramVec,
549    http_response_duration_seconds: HistogramVec,
550    http_connection_errors_total: GenericCounterVec<AtomicU64>,
551    http_status_errors_total: GenericCounterVec<AtomicU64>,
552}
553
554impl observe::MetricsIntercept for PrometheusInterceptor {
555    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
556        let labels = OperationLabels(labels);
557        match value {
558            observe::MetricValue::OperationBytes(v) => self
559                .operation_bytes
560                .with_label_values(&labels.values())
561                .observe(v as f64),
562            observe::MetricValue::OperationBytesRate(v) => self
563                .operation_bytes_rate
564                .with_label_values(&labels.values())
565                .observe(v),
566            observe::MetricValue::OperationEntries(v) => self
567                .operation_entries
568                .with_label_values(&labels.values())
569                .observe(v as f64),
570            observe::MetricValue::OperationEntriesRate(v) => self
571                .operation_entries_rate
572                .with_label_values(&labels.values())
573                .observe(v),
574            observe::MetricValue::OperationDurationSeconds(v) => self
575                .operation_duration_seconds
576                .with_label_values(&labels.values())
577                .observe(v.as_secs_f64()),
578            observe::MetricValue::OperationErrorsTotal => self
579                .operation_errors_total
580                .with_label_values(&labels.values())
581                .inc(),
582            observe::MetricValue::OperationExecuting(v) => self
583                .operation_executing
584                .with_label_values(&labels.values())
585                .add(v as i64),
586            observe::MetricValue::OperationTtfbSeconds(v) => self
587                .operation_ttfb_seconds
588                .with_label_values(&labels.values())
589                .observe(v.as_secs_f64()),
590
591            observe::MetricValue::HttpExecuting(v) => self
592                .http_executing
593                .with_label_values(&labels.values())
594                .add(v as i64),
595            observe::MetricValue::HttpRequestBytes(v) => self
596                .http_request_bytes
597                .with_label_values(&labels.values())
598                .observe(v as f64),
599            observe::MetricValue::HttpRequestBytesRate(v) => self
600                .http_request_bytes_rate
601                .with_label_values(&labels.values())
602                .observe(v),
603            observe::MetricValue::HttpRequestDurationSeconds(v) => self
604                .http_request_duration_seconds
605                .with_label_values(&labels.values())
606                .observe(v.as_secs_f64()),
607            observe::MetricValue::HttpResponseBytes(v) => self
608                .http_response_bytes
609                .with_label_values(&labels.values())
610                .observe(v as f64),
611            observe::MetricValue::HttpResponseBytesRate(v) => self
612                .http_response_bytes_rate
613                .with_label_values(&labels.values())
614                .observe(v),
615            observe::MetricValue::HttpResponseDurationSeconds(v) => self
616                .http_response_duration_seconds
617                .with_label_values(&labels.values())
618                .observe(v.as_secs_f64()),
619            observe::MetricValue::HttpConnectionErrorsTotal => self
620                .http_connection_errors_total
621                .with_label_values(&labels.values())
622                .inc(),
623            observe::MetricValue::HttpStatusErrorsTotal => self
624                .http_status_errors_total
625                .with_label_values(&labels.values())
626                .inc(),
627        }
628    }
629}
630
631struct OperationLabelNames(Vec<&'static str>);
632
633impl AsRef<[&'static str]> for OperationLabelNames {
634    fn as_ref(&self) -> &[&'static str] {
635        &self.0
636    }
637}
638
639impl OperationLabelNames {
640    fn with_error(mut self) -> Self {
641        self.0.push(observe::LABEL_ERROR);
642        self
643    }
644
645    fn with_status_code(mut self) -> Self {
646        self.0.push(observe::LABEL_STATUS_CODE);
647        self
648    }
649}
650
651#[derive(Clone, Debug, PartialEq, Eq, Hash)]
652struct OperationLabels(observe::MetricLabels);
653
654impl OperationLabels {
655    fn names() -> OperationLabelNames {
656        OperationLabelNames(vec![
657            observe::LABEL_SCHEME,
658            observe::LABEL_NAMESPACE,
659            observe::LABEL_ROOT,
660            observe::LABEL_OPERATION,
661        ])
662    }
663
664    fn values(&self) -> Vec<&str> {
665        let mut labels = Vec::with_capacity(6);
666
667        labels.extend([
668            self.0.scheme,
669            self.0.namespace.as_ref(),
670            self.0.root.as_ref(),
671            self.0.operation,
672        ]);
673
674        if let Some(error) = self.0.error {
675            labels.push(error.into_static());
676        }
677
678        if let Some(status_code) = &self.0.status_code {
679            labels.push(status_code.as_str());
680        }
681
682        labels
683    }
684}