opendal/layers/
prometheus_client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::time::Duration;
20
21use prometheus_client::encoding::EncodeLabel;
22use prometheus_client::encoding::EncodeLabelSet;
23use prometheus_client::encoding::LabelSetEncoder;
24use prometheus_client::metrics::counter::Counter;
25use prometheus_client::metrics::family::Family;
26use prometheus_client::metrics::family::MetricConstructor;
27use prometheus_client::metrics::gauge::Gauge;
28use prometheus_client::metrics::histogram::Histogram;
29use prometheus_client::registry::Metric;
30use prometheus_client::registry::Registry;
31use prometheus_client::registry::Unit;
32
33use crate::layers::observe;
34use crate::raw::*;
35use crate::*;
36
37/// Add [prometheus-client](https://docs.rs/prometheus-client) for every operation.
38///
39/// # Prometheus Metrics
40///
41/// We provide several metrics, please see the documentation of [`observe`] module.
42/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
43///
44/// # Examples
45///
46/// ```no_run
47/// # use log::debug;
48/// # use log::info;
49/// # use opendal::layers::PrometheusClientLayer;
50/// # use opendal::services;
51/// # use opendal::Operator;
52/// # use opendal::Result;
53///
54/// # #[tokio::main]
55/// # async fn main() -> Result<()> {
56/// let mut registry = prometheus_client::registry::Registry::default();
57///
58/// let op = Operator::new(services::Memory::default())?
59///     .layer(PrometheusClientLayer::builder().register(&mut registry))
60///     .finish();
61/// debug!("operator: {op:?}");
62///
63/// // Write data into object test.
64/// op.write("test", "Hello, World!").await?;
65/// // Read data from object.
66/// let bs = op.read("test").await?;
67/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
68///
69/// // Get object metadata.
70/// let meta = op.stat("test").await?;
71/// info!("meta: {:?}", meta);
72///
73/// // Export prometheus metrics.
74/// let mut buf = String::new();
75/// prometheus_client::encoding::text::encode(&mut buf, &registry).unwrap();
76/// println!("## Prometheus Metrics");
77/// println!("{}", buf);
78///
79/// Ok(())
80/// # }
81/// ```
82#[derive(Clone, Debug)]
83pub struct PrometheusClientLayer {
84    interceptor: PrometheusClientInterceptor,
85}
86
87impl PrometheusClientLayer {
88    /// Create a [`PrometheusClientLayerBuilder`] to set the configuration of metrics.
89    pub fn builder() -> PrometheusClientLayerBuilder {
90        PrometheusClientLayerBuilder::default()
91    }
92}
93
94impl<A: Access> Layer<A> for PrometheusClientLayer {
95    type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
96
97    fn layer(&self, inner: A) -> Self::LayeredAccess {
98        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
99    }
100}
101
102/// [`PrometheusClientLayerBuilder`] is a config builder to build a [`PrometheusClientLayer`].
103pub struct PrometheusClientLayerBuilder {
104    bytes_buckets: Vec<f64>,
105    bytes_rate_buckets: Vec<f64>,
106    entries_buckets: Vec<f64>,
107    entries_rate_buckets: Vec<f64>,
108    duration_seconds_buckets: Vec<f64>,
109    ttfb_buckets: Vec<f64>,
110}
111
112impl Default for PrometheusClientLayerBuilder {
113    fn default() -> Self {
114        Self {
115            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
116            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
117            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
118            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
119            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
120            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
121        }
122    }
123}
124
125impl PrometheusClientLayerBuilder {
126    /// Set buckets for bytes related histogram like `operation_bytes`.
127    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
128        if !buckets.is_empty() {
129            self.bytes_buckets = buckets;
130        }
131        self
132    }
133
134    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
135    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
136        if !buckets.is_empty() {
137            self.bytes_rate_buckets = buckets;
138        }
139        self
140    }
141
142    /// Set buckets for entries related histogram like `operation_entries`.
143    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
144        if !buckets.is_empty() {
145            self.entries_buckets = buckets;
146        }
147        self
148    }
149
150    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
151    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
152        if !buckets.is_empty() {
153            self.entries_rate_buckets = buckets;
154        }
155        self
156    }
157
158    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
159    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
160        if !buckets.is_empty() {
161            self.duration_seconds_buckets = buckets;
162        }
163        self
164    }
165
166    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
167    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
168        if !buckets.is_empty() {
169            self.ttfb_buckets = buckets;
170        }
171        self
172    }
173
174    /// Register the metrics into the registry and return a [`PrometheusClientLayer`].
175    ///
176    /// # Examples
177    ///
178    /// ```no_run
179    /// # use log::debug;
180    /// # use opendal::layers::PrometheusClientLayer;
181    /// # use opendal::services;
182    /// # use opendal::Operator;
183    /// # use opendal::Result;
184    ///
185    /// # #[tokio::main]
186    /// # async fn main() -> Result<()> {
187    /// // Pick a builder and configure it.
188    /// let builder = services::Memory::default();
189    /// let mut registry = prometheus_client::registry::Registry::default();
190    ///
191    /// let op = Operator::new(builder)?
192    ///     .layer(PrometheusClientLayer::builder().register(&mut registry))
193    ///     .finish();
194    /// debug!("operator: {op:?}");
195    ///
196    /// Ok(())
197    /// # }
198    /// ```
199    pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
200        let operation_bytes =
201            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
202                buckets: self.bytes_buckets.clone(),
203            });
204        let operation_bytes_rate =
205            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
206                buckets: self.bytes_rate_buckets.clone(),
207            });
208        let operation_entries =
209            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
210                buckets: self.entries_buckets.clone(),
211            });
212        let operation_entries_rate =
213            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
214                buckets: self.entries_rate_buckets.clone(),
215            });
216        let operation_duration_seconds =
217            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
218                buckets: self.duration_seconds_buckets.clone(),
219            });
220        let operation_errors_total = Family::<OperationLabels, Counter>::default();
221        let operation_executing = Family::<OperationLabels, Gauge>::default();
222        let operation_ttfb_seconds =
223            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
224                buckets: self.ttfb_buckets.clone(),
225            });
226
227        let http_executing = Family::<OperationLabels, Gauge>::default();
228        let http_request_bytes =
229            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
230                buckets: self.bytes_buckets.clone(),
231            });
232        let http_request_bytes_rate =
233            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
234                buckets: self.bytes_rate_buckets.clone(),
235            });
236        let http_request_duration_seconds =
237            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
238                buckets: self.duration_seconds_buckets.clone(),
239            });
240        let http_response_bytes =
241            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
242                buckets: self.bytes_buckets.clone(),
243            });
244        let http_response_bytes_rate =
245            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
246                buckets: self.bytes_rate_buckets.clone(),
247            });
248        let http_response_duration_seconds =
249            Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
250                buckets: self.duration_seconds_buckets.clone(),
251            });
252        let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
253        let http_status_errors_total = Family::<OperationLabels, Counter>::default();
254
255        register_metric(
256            registry,
257            operation_bytes.clone(),
258            observe::MetricValue::OperationBytes(0),
259        );
260        register_metric(
261            registry,
262            operation_bytes_rate.clone(),
263            observe::MetricValue::OperationBytesRate(0.0),
264        );
265        register_metric(
266            registry,
267            operation_entries.clone(),
268            observe::MetricValue::OperationEntries(0),
269        );
270        register_metric(
271            registry,
272            operation_entries_rate.clone(),
273            observe::MetricValue::OperationEntriesRate(0.0),
274        );
275        register_metric(
276            registry,
277            operation_duration_seconds.clone(),
278            observe::MetricValue::OperationDurationSeconds(Duration::default()),
279        );
280        register_metric(
281            registry,
282            operation_errors_total.clone(),
283            observe::MetricValue::OperationErrorsTotal,
284        );
285        register_metric(
286            registry,
287            operation_executing.clone(),
288            observe::MetricValue::OperationExecuting(0),
289        );
290        register_metric(
291            registry,
292            operation_ttfb_seconds.clone(),
293            observe::MetricValue::OperationTtfbSeconds(Duration::default()),
294        );
295
296        register_metric(
297            registry,
298            http_executing.clone(),
299            observe::MetricValue::HttpExecuting(0),
300        );
301        register_metric(
302            registry,
303            http_request_bytes.clone(),
304            observe::MetricValue::HttpRequestBytes(0),
305        );
306        register_metric(
307            registry,
308            http_request_bytes_rate.clone(),
309            observe::MetricValue::HttpRequestBytesRate(0.0),
310        );
311        register_metric(
312            registry,
313            http_request_duration_seconds.clone(),
314            observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
315        );
316        register_metric(
317            registry,
318            http_response_bytes.clone(),
319            observe::MetricValue::HttpResponseBytes(0),
320        );
321        register_metric(
322            registry,
323            http_response_bytes_rate.clone(),
324            observe::MetricValue::HttpResponseBytesRate(0.0),
325        );
326        register_metric(
327            registry,
328            http_response_duration_seconds.clone(),
329            observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
330        );
331        register_metric(
332            registry,
333            http_connection_errors_total.clone(),
334            observe::MetricValue::HttpConnectionErrorsTotal,
335        );
336        register_metric(
337            registry,
338            http_status_errors_total.clone(),
339            observe::MetricValue::HttpStatusErrorsTotal,
340        );
341
342        PrometheusClientLayer {
343            interceptor: PrometheusClientInterceptor {
344                operation_bytes,
345                operation_bytes_rate,
346                operation_entries,
347                operation_entries_rate,
348                operation_duration_seconds,
349                operation_errors_total,
350                operation_executing,
351                operation_ttfb_seconds,
352
353                http_executing,
354                http_request_bytes,
355                http_request_bytes_rate,
356                http_request_duration_seconds,
357                http_response_bytes,
358                http_response_bytes_rate,
359                http_response_duration_seconds,
360                http_connection_errors_total,
361                http_status_errors_total,
362            },
363        }
364    }
365}
366
367#[derive(Clone)]
368struct HistogramConstructor {
369    buckets: Vec<f64>,
370}
371
372impl MetricConstructor<Histogram> for HistogramConstructor {
373    fn new_metric(&self) -> Histogram {
374        Histogram::new(self.buckets.iter().cloned())
375    }
376}
377
378#[derive(Clone, Debug)]
379pub struct PrometheusClientInterceptor {
380    operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
381    operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
382    operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
383    operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
384    operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
385    operation_errors_total: Family<OperationLabels, Counter>,
386    operation_executing: Family<OperationLabels, Gauge>,
387    operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
388
389    http_executing: Family<OperationLabels, Gauge>,
390    http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
391    http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
392    http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
393    http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
394    http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
395    http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
396    http_connection_errors_total: Family<OperationLabels, Counter>,
397    http_status_errors_total: Family<OperationLabels, Counter>,
398}
399
400impl observe::MetricsIntercept for PrometheusClientInterceptor {
401    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
402        let labels = OperationLabels(labels);
403        match value {
404            observe::MetricValue::OperationBytes(v) => self
405                .operation_bytes
406                .get_or_create(&labels)
407                .observe(v as f64),
408            observe::MetricValue::OperationBytesRate(v) => {
409                self.operation_bytes_rate.get_or_create(&labels).observe(v)
410            }
411            observe::MetricValue::OperationEntries(v) => self
412                .operation_entries
413                .get_or_create(&labels)
414                .observe(v as f64),
415            observe::MetricValue::OperationEntriesRate(v) => self
416                .operation_entries_rate
417                .get_or_create(&labels)
418                .observe(v),
419            observe::MetricValue::OperationDurationSeconds(v) => self
420                .operation_duration_seconds
421                .get_or_create(&labels)
422                .observe(v.as_secs_f64()),
423            observe::MetricValue::OperationErrorsTotal => {
424                self.operation_errors_total.get_or_create(&labels).inc();
425            }
426            observe::MetricValue::OperationExecuting(v) => {
427                self.operation_executing
428                    .get_or_create(&labels)
429                    .inc_by(v as i64);
430            }
431            observe::MetricValue::OperationTtfbSeconds(v) => self
432                .operation_ttfb_seconds
433                .get_or_create(&labels)
434                .observe(v.as_secs_f64()),
435
436            observe::MetricValue::HttpExecuting(v) => {
437                self.http_executing.get_or_create(&labels).inc_by(v as i64);
438            }
439            observe::MetricValue::HttpRequestBytes(v) => self
440                .http_request_bytes
441                .get_or_create(&labels)
442                .observe(v as f64),
443            observe::MetricValue::HttpRequestBytesRate(v) => self
444                .http_request_bytes_rate
445                .get_or_create(&labels)
446                .observe(v),
447            observe::MetricValue::HttpRequestDurationSeconds(v) => self
448                .http_request_duration_seconds
449                .get_or_create(&labels)
450                .observe(v.as_secs_f64()),
451            observe::MetricValue::HttpResponseBytes(v) => self
452                .http_response_bytes
453                .get_or_create(&labels)
454                .observe(v as f64),
455            observe::MetricValue::HttpResponseBytesRate(v) => self
456                .http_response_bytes_rate
457                .get_or_create(&labels)
458                .observe(v),
459            observe::MetricValue::HttpResponseDurationSeconds(v) => self
460                .http_response_duration_seconds
461                .get_or_create(&labels)
462                .observe(v.as_secs_f64()),
463            observe::MetricValue::HttpConnectionErrorsTotal => {
464                self.http_connection_errors_total
465                    .get_or_create(&labels)
466                    .inc();
467            }
468            observe::MetricValue::HttpStatusErrorsTotal => {
469                self.http_status_errors_total.get_or_create(&labels).inc();
470            }
471        };
472    }
473}
474
475#[derive(Clone, Debug, PartialEq, Eq, Hash)]
476struct OperationLabels(observe::MetricLabels);
477
478impl EncodeLabelSet for OperationLabels {
479    fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> {
480        (observe::LABEL_SCHEME, self.0.scheme.into_static()).encode(encoder.encode_label())?;
481        (observe::LABEL_NAMESPACE, self.0.namespace.as_ref()).encode(encoder.encode_label())?;
482        (observe::LABEL_ROOT, self.0.root.as_ref()).encode(encoder.encode_label())?;
483        (observe::LABEL_OPERATION, self.0.operation).encode(encoder.encode_label())?;
484
485        if let Some(error) = &self.0.error {
486            (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
487        }
488        if let Some(code) = &self.0.status_code {
489            (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
490        }
491        Ok(())
492    }
493}
494
495fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
496    let ((name, unit), help) = (value.name_with_unit(), value.help());
497
498    if let Some(unit) = unit {
499        registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
500    } else {
501        registry.register(name, help, metric);
502    }
503}