opendal/layers/
prometheus_client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::time::Duration;
20
21use prometheus_client::encoding::EncodeLabel;
22use prometheus_client::encoding::EncodeLabelSet;
23use prometheus_client::encoding::LabelSetEncoder;
24use prometheus_client::metrics::counter::Counter;
25use prometheus_client::metrics::family::Family;
26use prometheus_client::metrics::family::MetricConstructor;
27use prometheus_client::metrics::gauge::Gauge;
28use prometheus_client::metrics::histogram::Histogram;
29use prometheus_client::registry::Metric;
30use prometheus_client::registry::Registry;
31use prometheus_client::registry::Unit;
32
33use crate::layers::observe;
34use crate::raw::*;
35use crate::*;
36
37/// Add [prometheus-client](https://docs.rs/prometheus-client) for every operation.
38///
39/// # Prometheus Metrics
40///
41/// We provide several metrics, please see the documentation of [`observe`] module.
42/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
43///
44/// # Examples
45///
46/// ```no_run
47/// # use log::debug;
48/// # use log::info;
49/// # use opendal::layers::PrometheusClientLayer;
50/// # use opendal::services;
51/// # use opendal::Operator;
52/// # use opendal::Result;
53///
54/// # #[tokio::main]
55/// # async fn main() -> Result<()> {
56/// let mut registry = prometheus_client::registry::Registry::default();
57///
58/// let op = Operator::new(services::Memory::default())?
59///     .layer(PrometheusClientLayer::builder().register(&mut registry))
60///     .finish();
61/// debug!("operator: {op:?}");
62///
63/// // Write data into object test.
64/// op.write("test", "Hello, World!").await?;
65/// // Read data from object.
66/// let bs = op.read("test").await?;
67/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
68///
69/// // Get object metadata.
70/// let meta = op.stat("test").await?;
71/// info!("meta: {:?}", meta);
72///
73/// // Export prometheus metrics.
74/// let mut buf = String::new();
75/// prometheus_client::encoding::text::encode(&mut buf, &registry).unwrap();
76/// println!("## Prometheus Metrics");
77/// println!("{}", buf);
78///
79/// Ok(())
80/// # }
81/// ```
82#[derive(Clone, Debug)]
83pub struct PrometheusClientLayer {
84    interceptor: PrometheusClientInterceptor,
85}
86
87impl PrometheusClientLayer {
88    /// Create a [`PrometheusClientLayerBuilder`] to set the configuration of metrics.
89    pub fn builder() -> PrometheusClientLayerBuilder {
90        PrometheusClientLayerBuilder::default()
91    }
92}
93
94impl<A: Access> Layer<A> for PrometheusClientLayer {
95    type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
96
97    fn layer(&self, inner: A) -> Self::LayeredAccess {
98        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
99    }
100}
101
102/// [`PrometheusClientLayerBuilder`] is a config builder to build a [`PrometheusClientLayer`].
103pub struct PrometheusClientLayerBuilder {
104    bytes_buckets: Vec<f64>,
105    bytes_rate_buckets: Vec<f64>,
106    entries_buckets: Vec<f64>,
107    entries_rate_buckets: Vec<f64>,
108    duration_seconds_buckets: Vec<f64>,
109    ttfb_buckets: Vec<f64>,
110    disable_label_root: bool,
111}
112
113impl Default for PrometheusClientLayerBuilder {
114    fn default() -> Self {
115        Self {
116            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
117            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
118            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
119            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
120            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
121            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
122            disable_label_root: false,
123        }
124    }
125}
126
127impl PrometheusClientLayerBuilder {
128    /// Set buckets for bytes related histogram like `operation_bytes`.
129    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
130        if !buckets.is_empty() {
131            self.bytes_buckets = buckets;
132        }
133        self
134    }
135
136    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
137    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
138        if !buckets.is_empty() {
139            self.bytes_rate_buckets = buckets;
140        }
141        self
142    }
143
144    /// Set buckets for entries related histogram like `operation_entries`.
145    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
146        if !buckets.is_empty() {
147            self.entries_buckets = buckets;
148        }
149        self
150    }
151
152    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
153    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
154        if !buckets.is_empty() {
155            self.entries_rate_buckets = buckets;
156        }
157        self
158    }
159
160    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
161    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
162        if !buckets.is_empty() {
163            self.duration_seconds_buckets = buckets;
164        }
165        self
166    }
167
168    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
169    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
170        if !buckets.is_empty() {
171            self.ttfb_buckets = buckets;
172        }
173        self
174    }
175
176    /// The 'root' label might have risks of being high cardinality, users can choose to disable it
177    /// when they found it's not useful for their metrics.
178    pub fn disable_label_root(mut self, disable: bool) -> Self {
179        self.disable_label_root = disable;
180        self
181    }
182
183    /// Register the metrics into the registry and return a [`PrometheusClientLayer`].
184    ///
185    /// # Examples
186    ///
187    /// ```no_run
188    /// # use log::debug;
189    /// # use opendal::layers::PrometheusClientLayer;
190    /// # use opendal::services;
191    /// # use opendal::Operator;
192    /// # use opendal::Result;
193    ///
194    /// # #[tokio::main]
195    /// # async fn main() -> Result<()> {
196    /// // Pick a builder and configure it.
197    /// let builder = services::Memory::default();
198    /// let mut registry = prometheus_client::registry::Registry::default();
199    ///
200    /// let op = Operator::new(builder)?
201    ///     .layer(PrometheusClientLayer::builder().register(&mut registry))
202    ///     .finish();
203    /// debug!("operator: {op:?}");
204    ///
205    /// Ok(())
206    /// # }
207    /// ```
208    pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
209        let operation_bytes =
210            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
211                buckets: self.bytes_buckets.clone(),
212            });
213        let operation_bytes_rate =
214            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
215                buckets: self.bytes_rate_buckets.clone(),
216            });
217        let operation_entries =
218            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
219                buckets: self.entries_buckets.clone(),
220            });
221        let operation_entries_rate =
222            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
223                buckets: self.entries_rate_buckets.clone(),
224            });
225        let operation_duration_seconds =
226            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
227                buckets: self.duration_seconds_buckets.clone(),
228            });
229        let operation_errors_total = Family::<OperationLabels, Counter>::default();
230        let operation_executing = Family::<OperationLabels, Gauge>::default();
231        let operation_ttfb_seconds =
232            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
233                buckets: self.ttfb_buckets.clone(),
234            });
235
236        let http_executing = Family::<OperationLabels, Gauge>::default();
237        let http_request_bytes =
238            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
239                buckets: self.bytes_buckets.clone(),
240            });
241        let http_request_bytes_rate =
242            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
243                buckets: self.bytes_rate_buckets.clone(),
244            });
245        let http_request_duration_seconds =
246            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
247                buckets: self.duration_seconds_buckets.clone(),
248            });
249        let http_response_bytes =
250            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
251                buckets: self.bytes_buckets.clone(),
252            });
253        let http_response_bytes_rate =
254            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
255                buckets: self.bytes_rate_buckets.clone(),
256            });
257        let http_response_duration_seconds =
258            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
259                buckets: self.duration_seconds_buckets.clone(),
260            });
261        let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
262        let http_status_errors_total = Family::<OperationLabels, Counter>::default();
263
264        register_metric(
265            registry,
266            operation_bytes.clone(),
267            observe::MetricValue::OperationBytes(0),
268        );
269        register_metric(
270            registry,
271            operation_bytes_rate.clone(),
272            observe::MetricValue::OperationBytesRate(0.0),
273        );
274        register_metric(
275            registry,
276            operation_entries.clone(),
277            observe::MetricValue::OperationEntries(0),
278        );
279        register_metric(
280            registry,
281            operation_entries_rate.clone(),
282            observe::MetricValue::OperationEntriesRate(0.0),
283        );
284        register_metric(
285            registry,
286            operation_duration_seconds.clone(),
287            observe::MetricValue::OperationDurationSeconds(Duration::default()),
288        );
289        register_metric(
290            registry,
291            operation_errors_total.clone(),
292            observe::MetricValue::OperationErrorsTotal,
293        );
294        register_metric(
295            registry,
296            operation_executing.clone(),
297            observe::MetricValue::OperationExecuting(0),
298        );
299        register_metric(
300            registry,
301            operation_ttfb_seconds.clone(),
302            observe::MetricValue::OperationTtfbSeconds(Duration::default()),
303        );
304
305        register_metric(
306            registry,
307            http_executing.clone(),
308            observe::MetricValue::HttpExecuting(0),
309        );
310        register_metric(
311            registry,
312            http_request_bytes.clone(),
313            observe::MetricValue::HttpRequestBytes(0),
314        );
315        register_metric(
316            registry,
317            http_request_bytes_rate.clone(),
318            observe::MetricValue::HttpRequestBytesRate(0.0),
319        );
320        register_metric(
321            registry,
322            http_request_duration_seconds.clone(),
323            observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
324        );
325        register_metric(
326            registry,
327            http_response_bytes.clone(),
328            observe::MetricValue::HttpResponseBytes(0),
329        );
330        register_metric(
331            registry,
332            http_response_bytes_rate.clone(),
333            observe::MetricValue::HttpResponseBytesRate(0.0),
334        );
335        register_metric(
336            registry,
337            http_response_duration_seconds.clone(),
338            observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
339        );
340        register_metric(
341            registry,
342            http_connection_errors_total.clone(),
343            observe::MetricValue::HttpConnectionErrorsTotal,
344        );
345        register_metric(
346            registry,
347            http_status_errors_total.clone(),
348            observe::MetricValue::HttpStatusErrorsTotal,
349        );
350
351        PrometheusClientLayer {
352            interceptor: PrometheusClientInterceptor {
353                operation_bytes,
354                operation_bytes_rate,
355                operation_entries,
356                operation_entries_rate,
357                operation_duration_seconds,
358                operation_errors_total,
359                operation_executing,
360                operation_ttfb_seconds,
361
362                http_executing,
363                http_request_bytes,
364                http_request_bytes_rate,
365                http_request_duration_seconds,
366                http_response_bytes,
367                http_response_bytes_rate,
368                http_response_duration_seconds,
369                http_connection_errors_total,
370                http_status_errors_total,
371
372                disable_label_root: self.disable_label_root,
373            },
374        }
375    }
376}
377
378#[derive(Clone)]
379struct HistogramConstructor {
380    buckets: Vec<f64>,
381}
382
383impl MetricConstructor<Histogram> for HistogramConstructor {
384    fn new_metric(&self) -> Histogram {
385        Histogram::new(self.buckets.iter().cloned())
386    }
387}
388
389#[derive(Clone, Debug)]
390pub struct PrometheusClientInterceptor {
391    operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
392    operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
393    operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
394    operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
395    operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
396    operation_errors_total: Family<OperationLabels, Counter>,
397    operation_executing: Family<OperationLabels, Gauge>,
398    operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
399
400    http_executing: Family<OperationLabels, Gauge>,
401    http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
402    http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
403    http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
404    http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
405    http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
406    http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
407    http_connection_errors_total: Family<OperationLabels, Counter>,
408    http_status_errors_total: Family<OperationLabels, Counter>,
409
410    disable_label_root: bool,
411}
412
413impl observe::MetricsIntercept for PrometheusClientInterceptor {
414    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
415        let labels = OperationLabels {
416            labels,
417            disable_label_root: self.disable_label_root,
418        };
419        match value {
420            observe::MetricValue::OperationBytes(v) => self
421                .operation_bytes
422                .get_or_create(&labels)
423                .observe(v as f64),
424            observe::MetricValue::OperationBytesRate(v) => {
425                self.operation_bytes_rate.get_or_create(&labels).observe(v)
426            }
427            observe::MetricValue::OperationEntries(v) => self
428                .operation_entries
429                .get_or_create(&labels)
430                .observe(v as f64),
431            observe::MetricValue::OperationEntriesRate(v) => self
432                .operation_entries_rate
433                .get_or_create(&labels)
434                .observe(v),
435            observe::MetricValue::OperationDurationSeconds(v) => self
436                .operation_duration_seconds
437                .get_or_create(&labels)
438                .observe(v.as_secs_f64()),
439            observe::MetricValue::OperationErrorsTotal => {
440                self.operation_errors_total.get_or_create(&labels).inc();
441            }
442            observe::MetricValue::OperationExecuting(v) => {
443                self.operation_executing
444                    .get_or_create(&labels)
445                    .inc_by(v as i64);
446            }
447            observe::MetricValue::OperationTtfbSeconds(v) => self
448                .operation_ttfb_seconds
449                .get_or_create(&labels)
450                .observe(v.as_secs_f64()),
451
452            observe::MetricValue::HttpExecuting(v) => {
453                self.http_executing.get_or_create(&labels).inc_by(v as i64);
454            }
455            observe::MetricValue::HttpRequestBytes(v) => self
456                .http_request_bytes
457                .get_or_create(&labels)
458                .observe(v as f64),
459            observe::MetricValue::HttpRequestBytesRate(v) => self
460                .http_request_bytes_rate
461                .get_or_create(&labels)
462                .observe(v),
463            observe::MetricValue::HttpRequestDurationSeconds(v) => self
464                .http_request_duration_seconds
465                .get_or_create(&labels)
466                .observe(v.as_secs_f64()),
467            observe::MetricValue::HttpResponseBytes(v) => self
468                .http_response_bytes
469                .get_or_create(&labels)
470                .observe(v as f64),
471            observe::MetricValue::HttpResponseBytesRate(v) => self
472                .http_response_bytes_rate
473                .get_or_create(&labels)
474                .observe(v),
475            observe::MetricValue::HttpResponseDurationSeconds(v) => self
476                .http_response_duration_seconds
477                .get_or_create(&labels)
478                .observe(v.as_secs_f64()),
479            observe::MetricValue::HttpConnectionErrorsTotal => {
480                self.http_connection_errors_total
481                    .get_or_create(&labels)
482                    .inc();
483            }
484            observe::MetricValue::HttpStatusErrorsTotal => {
485                self.http_status_errors_total.get_or_create(&labels).inc();
486            }
487        };
488    }
489}
490
491#[derive(Clone, Debug, PartialEq, Eq, Hash)]
492struct OperationLabels {
493    labels: observe::MetricLabels,
494    disable_label_root: bool,
495}
496
497impl EncodeLabelSet for OperationLabels {
498    fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> {
499        (observe::LABEL_SCHEME, self.labels.scheme.into_static()).encode(encoder.encode_label())?;
500        (observe::LABEL_NAMESPACE, self.labels.namespace.as_ref())
501            .encode(encoder.encode_label())?;
502        if !self.disable_label_root {
503            (observe::LABEL_ROOT, self.labels.root.as_ref()).encode(encoder.encode_label())?;
504        }
505        (observe::LABEL_OPERATION, self.labels.operation).encode(encoder.encode_label())?;
506
507        if let Some(error) = &self.labels.error {
508            (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
509        }
510        if let Some(code) = &self.labels.status_code {
511            (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
512        }
513        Ok(())
514    }
515}
516
517fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
518    let ((name, unit), help) = (value.name_with_unit(), value.help());
519
520    if let Some(unit) = unit {
521        registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
522    } else {
523        registry.register(name, help, metric);
524    }
525}