opendal_core/layers/
prometheus_client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19
20use prometheus_client::encoding::EncodeLabel;
21use prometheus_client::encoding::EncodeLabelSet;
22use prometheus_client::encoding::LabelSetEncoder;
23use prometheus_client::metrics::counter::Counter;
24use prometheus_client::metrics::family::Family;
25use prometheus_client::metrics::family::MetricConstructor;
26use prometheus_client::metrics::gauge::Gauge;
27use prometheus_client::metrics::histogram::Histogram;
28use prometheus_client::registry::Metric;
29use prometheus_client::registry::Registry;
30use prometheus_client::registry::Unit;
31
32use crate::layers::observe;
33use crate::raw::*;
34use crate::*;
35
36/// Add [prometheus-client](https://docs.rs/prometheus-client) for every operation.
37///
38/// # Prometheus Metrics
39///
40/// We provide several metrics, please see the documentation of [`observe`] module.
41/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
42///
43/// # Examples
44///
45/// ```no_run
46/// # use log::info;
47/// # use opendal_core::layers::PrometheusClientLayer;
48/// # use opendal_core::services;
49/// # use opendal_core::Operator;
50/// # use opendal_core::Result;
51///
52/// # #[tokio::main]
53/// # async fn main() -> Result<()> {
54/// let mut registry = prometheus_client::registry::Registry::default();
55///
56/// let op = Operator::new(services::Memory::default())?
57///     .layer(PrometheusClientLayer::builder().register(&mut registry))
58///     .finish();
59///
60/// // Write data into object test.
61/// op.write("test", "Hello, World!").await?;
62/// // Read data from the object.
63/// let bs = op.read("test").await?;
64/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
65///
66/// // Get object metadata.
67/// let meta = op.stat("test").await?;
68/// info!("meta: {:?}", meta);
69///
70/// // Export prometheus metrics.
71/// let mut buf = String::new();
72/// prometheus_client::encoding::text::encode(&mut buf, &registry).unwrap();
73/// println!("## Prometheus Metrics");
74/// println!("{}", buf);
75/// # Ok(())
76/// # }
77/// ```
78#[derive(Clone, Debug)]
79pub struct PrometheusClientLayer {
80    interceptor: PrometheusClientInterceptor,
81}
82
83impl PrometheusClientLayer {
84    /// Create a [`PrometheusClientLayerBuilder`] to set the configuration of metrics.
85    pub fn builder() -> PrometheusClientLayerBuilder {
86        PrometheusClientLayerBuilder::default()
87    }
88}
89
90impl<A: Access> Layer<A> for PrometheusClientLayer {
91    type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
92
93    fn layer(&self, inner: A) -> Self::LayeredAccess {
94        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
95    }
96}
97
98/// [`PrometheusClientLayerBuilder`] is a config builder to build a [`PrometheusClientLayer`].
99pub struct PrometheusClientLayerBuilder {
100    bytes_buckets: Vec<f64>,
101    bytes_rate_buckets: Vec<f64>,
102    entries_buckets: Vec<f64>,
103    entries_rate_buckets: Vec<f64>,
104    duration_seconds_buckets: Vec<f64>,
105    ttfb_buckets: Vec<f64>,
106    disable_label_root: bool,
107}
108
109impl Default for PrometheusClientLayerBuilder {
110    fn default() -> Self {
111        Self {
112            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
113            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
114            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
115            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
116            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
117            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
118            disable_label_root: false,
119        }
120    }
121}
122
123impl PrometheusClientLayerBuilder {
124    /// Set buckets for bytes related histogram like `operation_bytes`.
125    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
126        if !buckets.is_empty() {
127            self.bytes_buckets = buckets;
128        }
129        self
130    }
131
132    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
133    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
134        if !buckets.is_empty() {
135            self.bytes_rate_buckets = buckets;
136        }
137        self
138    }
139
140    /// Set buckets for entries related histogram like `operation_entries`.
141    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
142        if !buckets.is_empty() {
143            self.entries_buckets = buckets;
144        }
145        self
146    }
147
148    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
149    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
150        if !buckets.is_empty() {
151            self.entries_rate_buckets = buckets;
152        }
153        self
154    }
155
156    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
157    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
158        if !buckets.is_empty() {
159            self.duration_seconds_buckets = buckets;
160        }
161        self
162    }
163
164    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
165    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
166        if !buckets.is_empty() {
167            self.ttfb_buckets = buckets;
168        }
169        self
170    }
171
172    /// The 'root' label might have risks of being high cardinality, users can choose to disable it
173    /// when they found it's not useful for their metrics.
174    pub fn disable_label_root(mut self, disable: bool) -> Self {
175        self.disable_label_root = disable;
176        self
177    }
178
179    /// Register the metrics into the registry and return a [`PrometheusClientLayer`].
180    ///
181    /// # Example
182    ///
183    /// ```no_run
184    /// # use opendal_core::layers::PrometheusClientLayer;
185    /// # use opendal_core::services;
186    /// # use opendal_core::Operator;
187    /// # use opendal_core::Result;
188    ///
189    /// # #[tokio::main]
190    /// # async fn main() -> Result<()> {
191    /// // Pick a builder and configure it.
192    /// let builder = services::Memory::default();
193    /// let mut registry = prometheus_client::registry::Registry::default();
194    ///
195    /// let _ = Operator::new(builder)?
196    ///     .layer(PrometheusClientLayer::builder().register(&mut registry))
197    ///     .finish();
198    /// # Ok(())
199    /// # }
200    /// ```
201    pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
202        let operation_bytes =
203            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
204                buckets: self.bytes_buckets.clone(),
205            });
206        let operation_bytes_rate =
207            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
208                buckets: self.bytes_rate_buckets.clone(),
209            });
210        let operation_entries =
211            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
212                buckets: self.entries_buckets.clone(),
213            });
214        let operation_entries_rate =
215            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
216                buckets: self.entries_rate_buckets.clone(),
217            });
218        let operation_duration_seconds =
219            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
220                buckets: self.duration_seconds_buckets.clone(),
221            });
222        let operation_errors_total = Family::<OperationLabels, Counter>::default();
223        let operation_executing = Family::<OperationLabels, Gauge>::default();
224        let operation_ttfb_seconds =
225            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
226                buckets: self.ttfb_buckets.clone(),
227            });
228
229        let http_executing = Family::<OperationLabels, Gauge>::default();
230        let http_request_bytes =
231            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
232                buckets: self.bytes_buckets.clone(),
233            });
234        let http_request_bytes_rate =
235            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
236                buckets: self.bytes_rate_buckets.clone(),
237            });
238        let http_request_duration_seconds =
239            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
240                buckets: self.duration_seconds_buckets.clone(),
241            });
242        let http_response_bytes =
243            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
244                buckets: self.bytes_buckets.clone(),
245            });
246        let http_response_bytes_rate =
247            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
248                buckets: self.bytes_rate_buckets.clone(),
249            });
250        let http_response_duration_seconds =
251            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
252                buckets: self.duration_seconds_buckets.clone(),
253            });
254        let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
255        let http_status_errors_total = Family::<OperationLabels, Counter>::default();
256
257        register_metric(
258            registry,
259            operation_bytes.clone(),
260            observe::MetricValue::OperationBytes(0),
261        );
262        register_metric(
263            registry,
264            operation_bytes_rate.clone(),
265            observe::MetricValue::OperationBytesRate(0.0),
266        );
267        register_metric(
268            registry,
269            operation_entries.clone(),
270            observe::MetricValue::OperationEntries(0),
271        );
272        register_metric(
273            registry,
274            operation_entries_rate.clone(),
275            observe::MetricValue::OperationEntriesRate(0.0),
276        );
277        register_metric(
278            registry,
279            operation_duration_seconds.clone(),
280            observe::MetricValue::OperationDurationSeconds(Duration::default()),
281        );
282        register_metric(
283            registry,
284            operation_errors_total.clone(),
285            observe::MetricValue::OperationErrorsTotal,
286        );
287        register_metric(
288            registry,
289            operation_executing.clone(),
290            observe::MetricValue::OperationExecuting(0),
291        );
292        register_metric(
293            registry,
294            operation_ttfb_seconds.clone(),
295            observe::MetricValue::OperationTtfbSeconds(Duration::default()),
296        );
297
298        register_metric(
299            registry,
300            http_executing.clone(),
301            observe::MetricValue::HttpExecuting(0),
302        );
303        register_metric(
304            registry,
305            http_request_bytes.clone(),
306            observe::MetricValue::HttpRequestBytes(0),
307        );
308        register_metric(
309            registry,
310            http_request_bytes_rate.clone(),
311            observe::MetricValue::HttpRequestBytesRate(0.0),
312        );
313        register_metric(
314            registry,
315            http_request_duration_seconds.clone(),
316            observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
317        );
318        register_metric(
319            registry,
320            http_response_bytes.clone(),
321            observe::MetricValue::HttpResponseBytes(0),
322        );
323        register_metric(
324            registry,
325            http_response_bytes_rate.clone(),
326            observe::MetricValue::HttpResponseBytesRate(0.0),
327        );
328        register_metric(
329            registry,
330            http_response_duration_seconds.clone(),
331            observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
332        );
333        register_metric(
334            registry,
335            http_connection_errors_total.clone(),
336            observe::MetricValue::HttpConnectionErrorsTotal,
337        );
338        register_metric(
339            registry,
340            http_status_errors_total.clone(),
341            observe::MetricValue::HttpStatusErrorsTotal,
342        );
343
344        PrometheusClientLayer {
345            interceptor: PrometheusClientInterceptor {
346                operation_bytes,
347                operation_bytes_rate,
348                operation_entries,
349                operation_entries_rate,
350                operation_duration_seconds,
351                operation_errors_total,
352                operation_executing,
353                operation_ttfb_seconds,
354
355                http_executing,
356                http_request_bytes,
357                http_request_bytes_rate,
358                http_request_duration_seconds,
359                http_response_bytes,
360                http_response_bytes_rate,
361                http_response_duration_seconds,
362                http_connection_errors_total,
363                http_status_errors_total,
364
365                disable_label_root: self.disable_label_root,
366            },
367        }
368    }
369}
370
371#[derive(Clone)]
372struct HistogramConstructor {
373    buckets: Vec<f64>,
374}
375
376impl MetricConstructor<Histogram> for HistogramConstructor {
377    fn new_metric(&self) -> Histogram {
378        Histogram::new(self.buckets.iter().cloned())
379    }
380}
381
382#[derive(Clone, Debug)]
383pub struct PrometheusClientInterceptor {
384    operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
385    operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
386    operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
387    operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
388    operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
389    operation_errors_total: Family<OperationLabels, Counter>,
390    operation_executing: Family<OperationLabels, Gauge>,
391    operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
392
393    http_executing: Family<OperationLabels, Gauge>,
394    http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
395    http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
396    http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
397    http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
398    http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
399    http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
400    http_connection_errors_total: Family<OperationLabels, Counter>,
401    http_status_errors_total: Family<OperationLabels, Counter>,
402
403    disable_label_root: bool,
404}
405
406impl observe::MetricsIntercept for PrometheusClientInterceptor {
407    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
408        let labels = OperationLabels {
409            labels,
410            disable_label_root: self.disable_label_root,
411        };
412        match value {
413            observe::MetricValue::OperationBytes(v) => self
414                .operation_bytes
415                .get_or_create(&labels)
416                .observe(v as f64),
417            observe::MetricValue::OperationBytesRate(v) => {
418                self.operation_bytes_rate.get_or_create(&labels).observe(v)
419            }
420            observe::MetricValue::OperationEntries(v) => self
421                .operation_entries
422                .get_or_create(&labels)
423                .observe(v as f64),
424            observe::MetricValue::OperationEntriesRate(v) => self
425                .operation_entries_rate
426                .get_or_create(&labels)
427                .observe(v),
428            observe::MetricValue::OperationDurationSeconds(v) => self
429                .operation_duration_seconds
430                .get_or_create(&labels)
431                .observe(v.as_secs_f64()),
432            observe::MetricValue::OperationErrorsTotal => {
433                self.operation_errors_total.get_or_create(&labels).inc();
434            }
435            observe::MetricValue::OperationExecuting(v) => {
436                self.operation_executing
437                    .get_or_create(&labels)
438                    .inc_by(v as i64);
439            }
440            observe::MetricValue::OperationTtfbSeconds(v) => self
441                .operation_ttfb_seconds
442                .get_or_create(&labels)
443                .observe(v.as_secs_f64()),
444
445            observe::MetricValue::HttpExecuting(v) => {
446                self.http_executing.get_or_create(&labels).inc_by(v as i64);
447            }
448            observe::MetricValue::HttpRequestBytes(v) => self
449                .http_request_bytes
450                .get_or_create(&labels)
451                .observe(v as f64),
452            observe::MetricValue::HttpRequestBytesRate(v) => self
453                .http_request_bytes_rate
454                .get_or_create(&labels)
455                .observe(v),
456            observe::MetricValue::HttpRequestDurationSeconds(v) => self
457                .http_request_duration_seconds
458                .get_or_create(&labels)
459                .observe(v.as_secs_f64()),
460            observe::MetricValue::HttpResponseBytes(v) => self
461                .http_response_bytes
462                .get_or_create(&labels)
463                .observe(v as f64),
464            observe::MetricValue::HttpResponseBytesRate(v) => self
465                .http_response_bytes_rate
466                .get_or_create(&labels)
467                .observe(v),
468            observe::MetricValue::HttpResponseDurationSeconds(v) => self
469                .http_response_duration_seconds
470                .get_or_create(&labels)
471                .observe(v.as_secs_f64()),
472            observe::MetricValue::HttpConnectionErrorsTotal => {
473                self.http_connection_errors_total
474                    .get_or_create(&labels)
475                    .inc();
476            }
477            observe::MetricValue::HttpStatusErrorsTotal => {
478                self.http_status_errors_total.get_or_create(&labels).inc();
479            }
480        };
481    }
482}
483
484#[derive(Clone, Debug, PartialEq, Eq, Hash)]
485struct OperationLabels {
486    labels: observe::MetricLabels,
487    disable_label_root: bool,
488}
489
490impl EncodeLabelSet for OperationLabels {
491    fn encode(&self, encoder: &mut LabelSetEncoder<'_>) -> Result<(), fmt::Error> {
492        (observe::LABEL_SCHEME, self.labels.scheme).encode(encoder.encode_label())?;
493        (observe::LABEL_NAMESPACE, self.labels.namespace.as_ref())
494            .encode(encoder.encode_label())?;
495        if !self.disable_label_root {
496            (observe::LABEL_ROOT, self.labels.root.as_ref()).encode(encoder.encode_label())?;
497        }
498        (observe::LABEL_OPERATION, self.labels.operation).encode(encoder.encode_label())?;
499
500        if let Some(error) = &self.labels.error {
501            (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
502        }
503        if let Some(code) = &self.labels.status_code {
504            (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
505        }
506        Ok(())
507    }
508}
509
510fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
511    let ((name, unit), help) = (value.name_with_unit(), value.help());
512
513    if let Some(unit) = unit {
514        registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
515    } else {
516        registry.register(name, help, metric);
517    }
518}