opendal_core/layers/
fastmetrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19
20use fastmetrics::encoder::EncodeLabelSet;
21use fastmetrics::encoder::LabelSetEncoder;
22use fastmetrics::metrics::counter::Counter;
23use fastmetrics::metrics::family::Family;
24use fastmetrics::metrics::family::MetricFactory;
25use fastmetrics::metrics::gauge::Gauge;
26use fastmetrics::metrics::histogram::Histogram;
27use fastmetrics::registry::Register;
28use fastmetrics::registry::Registry;
29use fastmetrics::registry::RegistryError;
30use fastmetrics::registry::with_global_registry_mut;
31
32use crate::layers::observe;
33use crate::raw::*;
34use crate::*;
35
36/// Add [fastmetrics](https://docs.rs/fastmetrics/) for every operation.
37///
38/// # Examples
39///
40/// ## Basic Usage
41///
42/// ```no_run
43/// # use fastmetrics::format::text;
44/// # use log::info;
45/// # use opendal_core::layers::FastmetricsLayer;
46/// # use opendal_core::services;
47/// # use opendal_core::Operator;
48/// # use opendal_core::Result;
49///
50/// # #[tokio::main]
51/// # async fn main() -> Result<()> {
52/// let mut registry = fastmetrics::registry::Registry::default();
53/// let op = Operator::new(services::Memory::default())?
54///     .layer(FastmetricsLayer::builder().register(&mut registry)?)
55///     .finish();
56///
57/// // Write data into object test.
58/// op.write("test", "Hello, World!").await?;
59///
60/// // Read data from the object.
61/// let bs = op.read("test").await?;
62/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
63///
64/// // Get object metadata.
65/// let meta = op.stat("test").await?;
66/// info!("meta: {:?}", meta);
67///
68/// // Export prometheus metrics.
69/// let mut output = String::new();
70/// text::encode(&mut output, &registry).unwrap();
71/// println!("{}", output);
72/// # Ok(())
73/// # }
74/// ```
75/// ## Global Instance
76///
77/// `FastmetricsLayer` needs to be registered before instantiation.
78///
79/// If there are multiple operators in an application that need the `FastmetricsLayer`, we could
80/// instantiate it once and pass it to multiple operators. But we cannot directly call
81/// `.layer(FastmetricsLayer::builder().register(&mut registry)?)` for different services, because
82/// registering the same metrics multiple times to the same registry will cause register errors.
83/// Therefore, we can provide a global instance for the `FastmetricsLayer`.
84///
85/// ```no_run
86/// # use std::sync::OnceLock;
87/// # use fastmetrics::format::text;
88/// # use fastmetrics::registry::with_global_registry;
89/// # use log::info;
90/// # use opendal_core::layers::FastmetricsLayer;
91/// # use opendal_core::services;
92/// # use opendal_core::Operator;
93/// # use opendal_core::Result;
94///
95/// fn global_fastmetrics_layer() -> &'static FastmetricsLayer {
96///     static GLOBAL: OnceLock<FastmetricsLayer> = OnceLock::new();
97///     GLOBAL.get_or_init(|| {
98///         FastmetricsLayer::builder()
99///             .register_global()
100///             .expect("Failed to register with the global registry")
101///     })
102/// }
103///
104/// # #[tokio::main]
105/// # async fn main() -> Result<()> {
106/// let op = Operator::new(services::Memory::default())?
107///     .layer(global_fastmetrics_layer().clone())
108///     .finish();
109///
110/// // Write data into object test.
111/// op.write("test", "Hello, World!").await?;
112///
113/// // Read data from the object.
114/// let bs = op.read("test").await?;
115/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
116///
117/// // Get object metadata.
118/// let meta = op.stat("test").await?;
119/// info!("meta: {:?}", meta);
120///
121/// // Export prometheus metrics.
122/// let mut output = String::new();
123/// with_global_registry(|registry| text::encode(&mut output, &registry).unwrap());
124/// println!("{}", output);
125/// # Ok(())
126/// # }
127#[derive(Clone, Debug)]
128pub struct FastmetricsLayer {
129    interceptor: FastmetricsInterceptor,
130}
131
132impl FastmetricsLayer {
133    /// Create a [`FastmetricsLayerBuilder`] to set the configuration of metrics.
134    pub fn builder() -> FastmetricsLayerBuilder {
135        FastmetricsLayerBuilder::default()
136    }
137}
138
139impl<A: Access> Layer<A> for FastmetricsLayer {
140    type LayeredAccess = observe::MetricsAccessor<A, FastmetricsInterceptor>;
141
142    fn layer(&self, inner: A) -> Self::LayeredAccess {
143        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
144    }
145}
146
147/// [`FastmetricsLayerBuilder`] is a config builder to build a [`FastmetricsLayer`].
148pub struct FastmetricsLayerBuilder {
149    bytes_buckets: Vec<f64>,
150    bytes_rate_buckets: Vec<f64>,
151    entries_buckets: Vec<f64>,
152    entries_rate_buckets: Vec<f64>,
153    duration_seconds_buckets: Vec<f64>,
154    ttfb_buckets: Vec<f64>,
155    disable_label_root: bool,
156}
157
158impl Default for FastmetricsLayerBuilder {
159    fn default() -> Self {
160        Self {
161            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
162            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
163            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
164            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
165            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
166            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
167            disable_label_root: false,
168        }
169    }
170}
171
172impl FastmetricsLayerBuilder {
173    /// Set buckets for bytes related histogram like `operation_bytes`.
174    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
175        if !buckets.is_empty() {
176            self.bytes_buckets = buckets;
177        }
178        self
179    }
180
181    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
182    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
183        if !buckets.is_empty() {
184            self.bytes_rate_buckets = buckets;
185        }
186        self
187    }
188
189    /// Set buckets for entries related histogram like `operation_entries`.
190    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
191        if !buckets.is_empty() {
192            self.entries_buckets = buckets;
193        }
194        self
195    }
196
197    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
198    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
199        if !buckets.is_empty() {
200            self.entries_rate_buckets = buckets;
201        }
202        self
203    }
204
205    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
206    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
207        if !buckets.is_empty() {
208            self.duration_seconds_buckets = buckets;
209        }
210        self
211    }
212
213    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
214    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
215        if !buckets.is_empty() {
216            self.ttfb_buckets = buckets;
217        }
218        self
219    }
220
221    /// The 'root' label might have risks of being high cardinality; users can choose to disable it
222    /// when they found it's not useful for their metrics.
223    pub fn disable_label_root(mut self, disable: bool) -> Self {
224        self.disable_label_root = disable;
225        self
226    }
227
228    /// Register the metrics into the registry and return a [`FastmetricsLayer`].
229    ///
230    /// # Example
231    ///
232    /// ```no_run
233    /// # use opendal_core::layers::FastmetricsLayer;
234    /// # use opendal_core::services;
235    /// # use opendal_core::Operator;
236    /// # use opendal_core::Result;
237    ///
238    /// # #[tokio::main]
239    /// # async fn main() -> Result<()> {
240    /// let mut registry = fastmetrics::registry::Registry::default();
241    ///
242    /// // Pick a builder and configure it.
243    /// let builder = services::Memory::default();
244    /// let _ = Operator::new(builder)?
245    ///     .layer(FastmetricsLayer::builder().register(&mut registry)?)
246    ///     .finish();
247    /// # Ok(())
248    /// # }
249    /// ```
250    pub fn register(self, registry: &mut Registry) -> Result<FastmetricsLayer> {
251        let operation_bytes = Family::new(HistogramFactory {
252            buckets: self.bytes_buckets.clone(),
253        });
254        let operation_bytes_rate = Family::new(HistogramFactory {
255            buckets: self.bytes_rate_buckets.clone(),
256        });
257        let operation_entries = Family::new(HistogramFactory {
258            buckets: self.entries_buckets.clone(),
259        });
260        let operation_entries_rate = Family::new(HistogramFactory {
261            buckets: self.entries_rate_buckets.clone(),
262        });
263        let operation_duration_seconds = Family::new(HistogramFactory {
264            buckets: self.duration_seconds_buckets.clone(),
265        });
266        let operation_errors_total = Family::default();
267        let operation_executing = Family::default();
268        let operation_ttfb_seconds = Family::new(HistogramFactory {
269            buckets: self.ttfb_buckets.clone(),
270        });
271
272        let http_executing = Family::default();
273        let http_request_bytes = Family::new(HistogramFactory {
274            buckets: self.bytes_buckets.clone(),
275        });
276        let http_request_bytes_rate = Family::new(HistogramFactory {
277            buckets: self.bytes_rate_buckets.clone(),
278        });
279        let http_request_duration_seconds = Family::new(HistogramFactory {
280            buckets: self.duration_seconds_buckets.clone(),
281        });
282        let http_response_bytes = Family::new(HistogramFactory {
283            buckets: self.bytes_buckets.clone(),
284        });
285        let http_response_bytes_rate = Family::new(HistogramFactory {
286            buckets: self.bytes_rate_buckets.clone(),
287        });
288        let http_response_duration_seconds = Family::new(HistogramFactory {
289            buckets: self.duration_seconds_buckets.clone(),
290        });
291        let http_connection_errors_total = Family::default();
292        let http_status_errors_total = Family::default();
293
294        let interceptor = FastmetricsInterceptor {
295            operation_bytes,
296            operation_bytes_rate,
297            operation_entries,
298            operation_entries_rate,
299            operation_duration_seconds,
300            operation_errors_total,
301            operation_executing,
302            operation_ttfb_seconds,
303
304            http_executing,
305            http_request_bytes,
306            http_request_bytes_rate,
307            http_request_duration_seconds,
308            http_response_bytes,
309            http_response_bytes_rate,
310            http_response_duration_seconds,
311            http_connection_errors_total,
312            http_status_errors_total,
313
314            disable_label_root: self.disable_label_root,
315        };
316        interceptor
317            .register(registry)
318            .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err))?;
319
320        Ok(FastmetricsLayer { interceptor })
321    }
322
323    /// Register the metrics into the global registry and return a [`FastmetricsLayer`].
324    ///
325    /// # Example
326    ///
327    /// ```no_run
328    /// # use opendal_core::layers::FastmetricsLayer;
329    /// # use opendal_core::services;
330    /// # use opendal_core::Operator;
331    /// # use opendal_core::Result;
332    ///
333    /// # fn main() -> Result<()> {
334    /// // Pick a builder and configure it.
335    /// let builder = services::Memory::default();
336    /// let _ = Operator::new(builder)?
337    ///     .layer(FastmetricsLayer::builder().register_global()?)
338    ///     .finish();
339    /// # Ok(())
340    /// # }
341    /// ```
342    pub fn register_global(self) -> Result<FastmetricsLayer> {
343        with_global_registry_mut(|registry| self.register(registry))
344    }
345}
346
347#[derive(Clone)]
348struct HistogramFactory {
349    buckets: Vec<f64>,
350}
351
352impl MetricFactory<Histogram> for HistogramFactory {
353    fn new_metric(&self) -> Histogram {
354        Histogram::new(self.buckets.iter().cloned())
355    }
356}
357
358#[derive(Clone, Debug)]
359pub struct FastmetricsInterceptor {
360    operation_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
361    operation_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
362    operation_entries: Family<OperationLabels, Histogram, HistogramFactory>,
363    operation_entries_rate: Family<OperationLabels, Histogram, HistogramFactory>,
364    operation_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
365    operation_errors_total: Family<OperationLabels, Counter>,
366    operation_executing: Family<OperationLabels, Gauge>,
367    operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
368
369    http_executing: Family<OperationLabels, Gauge>,
370    http_request_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
371    http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
372    http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
373    http_response_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
374    http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
375    http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
376    http_connection_errors_total: Family<OperationLabels, Counter>,
377    http_status_errors_total: Family<OperationLabels, Counter>,
378
379    disable_label_root: bool,
380}
381
382impl Register for FastmetricsInterceptor {
383    fn register(&self, registry: &mut Registry) -> Result<(), RegistryError> {
384        macro_rules! register_metrics {
385            ($($field:ident => $value:expr),* $(,)?) => {
386                $(
387                    {
388                        let ((name, unit), help) = ($value.name_with_unit(), $value.help());
389                        registry.register_metric(name, help, unit, self.$field.clone())?;
390                    }
391                )*
392            };
393        }
394
395        register_metrics! {
396            // Operation metrics
397            operation_bytes => observe::MetricValue::OperationBytes(0),
398            operation_bytes_rate => observe::MetricValue::OperationBytesRate(0.0),
399            operation_entries => observe::MetricValue::OperationEntries(0),
400            operation_entries_rate => observe::MetricValue::OperationEntriesRate(0.0),
401            operation_duration_seconds => observe::MetricValue::OperationDurationSeconds(Duration::default()),
402            operation_errors_total => observe::MetricValue::OperationErrorsTotal,
403            operation_executing => observe::MetricValue::OperationExecuting(0),
404            operation_ttfb_seconds => observe::MetricValue::OperationTtfbSeconds(Duration::default()),
405
406            // HTTP metrics
407            http_executing => observe::MetricValue::HttpExecuting(0),
408            http_request_bytes => observe::MetricValue::HttpRequestBytes(0),
409            http_request_bytes_rate => observe::MetricValue::HttpRequestBytesRate(0.0),
410            http_request_duration_seconds => observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
411            http_response_bytes => observe::MetricValue::HttpResponseBytes(0),
412            http_response_bytes_rate => observe::MetricValue::HttpResponseBytesRate(0.0),
413            http_response_duration_seconds => observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
414            http_connection_errors_total => observe::MetricValue::HttpConnectionErrorsTotal,
415            http_status_errors_total => observe::MetricValue::HttpStatusErrorsTotal,
416        }
417
418        Ok(())
419    }
420}
421
422impl observe::MetricsIntercept for FastmetricsInterceptor {
423    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
424        let labels = OperationLabels {
425            labels,
426            disable_label_root: self.disable_label_root,
427        };
428        match value {
429            observe::MetricValue::OperationBytes(v) => {
430                self.operation_bytes
431                    .with_or_new(&labels, |hist| hist.observe(v as f64));
432            }
433            observe::MetricValue::OperationBytesRate(v) => {
434                self.operation_bytes_rate
435                    .with_or_new(&labels, |hist| hist.observe(v));
436            }
437            observe::MetricValue::OperationEntries(v) => {
438                self.operation_entries
439                    .with_or_new(&labels, |hist| hist.observe(v as f64));
440            }
441            observe::MetricValue::OperationEntriesRate(v) => {
442                self.operation_entries_rate
443                    .with_or_new(&labels, |hist| hist.observe(v));
444            }
445            observe::MetricValue::OperationDurationSeconds(v) => {
446                self.operation_duration_seconds
447                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
448            }
449            observe::MetricValue::OperationErrorsTotal => {
450                self.operation_errors_total
451                    .with_or_new(&labels, |counter| counter.inc());
452            }
453            observe::MetricValue::OperationExecuting(v) => {
454                self.operation_executing
455                    .with_or_new(&labels, |gauge| gauge.inc_by(v as i64));
456            }
457            observe::MetricValue::OperationTtfbSeconds(v) => {
458                self.operation_ttfb_seconds
459                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
460            }
461
462            observe::MetricValue::HttpExecuting(v) => {
463                self.http_executing
464                    .with_or_new(&labels, |gauge| gauge.inc_by(v as i64));
465            }
466            observe::MetricValue::HttpRequestBytes(v) => {
467                self.http_request_bytes
468                    .with_or_new(&labels, |hist| hist.observe(v as f64));
469            }
470            observe::MetricValue::HttpRequestBytesRate(v) => {
471                self.http_request_bytes_rate
472                    .with_or_new(&labels, |hist| hist.observe(v));
473            }
474            observe::MetricValue::HttpRequestDurationSeconds(v) => {
475                self.http_request_duration_seconds
476                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
477            }
478            observe::MetricValue::HttpResponseBytes(v) => {
479                self.http_response_bytes
480                    .with_or_new(&labels, |hist| hist.observe(v as f64));
481            }
482            observe::MetricValue::HttpResponseBytesRate(v) => {
483                self.http_response_bytes_rate
484                    .with_or_new(&labels, |hist| hist.observe(v));
485            }
486            observe::MetricValue::HttpResponseDurationSeconds(v) => {
487                self.http_response_duration_seconds
488                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
489            }
490            observe::MetricValue::HttpConnectionErrorsTotal => {
491                self.http_connection_errors_total
492                    .with_or_new(&labels, |counter| counter.inc());
493            }
494            observe::MetricValue::HttpStatusErrorsTotal => {
495                self.http_status_errors_total
496                    .with_or_new(&labels, |counter| counter.inc());
497            }
498        };
499    }
500}
501
502#[derive(Clone, Debug, PartialEq, Eq, Hash)]
503struct OperationLabels {
504    labels: observe::MetricLabels,
505    disable_label_root: bool,
506}
507
508impl EncodeLabelSet for OperationLabels {
509    fn encode(&self, encoder: &mut dyn LabelSetEncoder) -> fmt::Result {
510        encoder.encode(&(observe::LABEL_SCHEME, self.labels.scheme))?;
511        encoder.encode(&(observe::LABEL_NAMESPACE, self.labels.namespace.as_ref()))?;
512        if !self.disable_label_root {
513            encoder.encode(&(observe::LABEL_ROOT, self.labels.root.as_ref()))?;
514        }
515        encoder.encode(&(observe::LABEL_OPERATION, self.labels.operation))?;
516        if let Some(error) = &self.labels.error {
517            encoder.encode(&(observe::LABEL_ERROR, error.into_static()))?;
518        }
519        if let Some(code) = &self.labels.status_code {
520            encoder.encode(&(observe::LABEL_STATUS_CODE, code.as_str()))?;
521        }
522        Ok(())
523    }
524}