opendal/layers/
fastmetrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{fmt, time::Duration};
19
20use fastmetrics::encoder::EncodeLabelSet;
21use fastmetrics::encoder::LabelSetEncoder;
22use fastmetrics::metrics::counter::Counter;
23use fastmetrics::metrics::family::Family;
24use fastmetrics::metrics::family::MetricFactory;
25use fastmetrics::metrics::gauge::Gauge;
26use fastmetrics::metrics::histogram::Histogram;
27use fastmetrics::registry::with_global_registry_mut;
28use fastmetrics::registry::Register;
29use fastmetrics::registry::Registry;
30use fastmetrics::registry::RegistryError;
31
32use crate::layers::observe;
33use crate::raw::*;
34use crate::*;
35
36/// Add [fastmetrics](https://docs.rs/fastmetrics/) for every operation.
37///
38/// # Examples
39///
40/// ```no_run
41/// # use opendal::layers::FastmetricsLayer;
42/// # use opendal::services;
43/// # use opendal::Operator;
44/// # use opendal::Result;
45///
46/// # fn main() -> Result<()> {
47/// let mut registry = fastmetrics::registry::Registry::default();
48/// let _ = Operator::new(services::Memory::default())?
49///     .layer(FastmetricsLayer::builder().register(&mut registry)?)
50///     .finish();
51/// # Ok(())
52/// # }
53/// ```
54#[derive(Clone, Debug)]
55pub struct FastmetricsLayer {
56    interceptor: FastmetricsInterceptor,
57}
58
59impl FastmetricsLayer {
60    /// Create a [`FastmetricsLayerBuilder`] to set the configuration of metrics.
61    pub fn builder() -> FastmetricsLayerBuilder {
62        FastmetricsLayerBuilder::default()
63    }
64}
65
66impl<A: Access> Layer<A> for FastmetricsLayer {
67    type LayeredAccess = observe::MetricsAccessor<A, FastmetricsInterceptor>;
68
69    fn layer(&self, inner: A) -> Self::LayeredAccess {
70        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
71    }
72}
73
74/// [`FastmetricsLayerBuilder`] is a config builder to build a [`FastmetricsLayer`].
75pub struct FastmetricsLayerBuilder {
76    bytes_buckets: Vec<f64>,
77    bytes_rate_buckets: Vec<f64>,
78    entries_buckets: Vec<f64>,
79    entries_rate_buckets: Vec<f64>,
80    duration_seconds_buckets: Vec<f64>,
81    ttfb_buckets: Vec<f64>,
82    disable_label_root: bool,
83}
84
85impl Default for FastmetricsLayerBuilder {
86    fn default() -> Self {
87        Self {
88            bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
89            bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
90            entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
91            entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
92            duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
93            ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
94            disable_label_root: false,
95        }
96    }
97}
98
99impl FastmetricsLayerBuilder {
100    /// Set buckets for bytes related histogram like `operation_bytes`.
101    pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
102        if !buckets.is_empty() {
103            self.bytes_buckets = buckets;
104        }
105        self
106    }
107
108    /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
109    pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
110        if !buckets.is_empty() {
111            self.bytes_rate_buckets = buckets;
112        }
113        self
114    }
115
116    /// Set buckets for entries related histogram like `operation_entries`.
117    pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
118        if !buckets.is_empty() {
119            self.entries_buckets = buckets;
120        }
121        self
122    }
123
124    /// Set buckets for entries rate related histogram like `operation_entries_rate`.
125    pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
126        if !buckets.is_empty() {
127            self.entries_rate_buckets = buckets;
128        }
129        self
130    }
131
132    /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
133    pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
134        if !buckets.is_empty() {
135            self.duration_seconds_buckets = buckets;
136        }
137        self
138    }
139
140    /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
141    pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
142        if !buckets.is_empty() {
143            self.ttfb_buckets = buckets;
144        }
145        self
146    }
147
148    /// The 'root' label might have risks of being high cardinality; users can choose to disable it
149    /// when they found it's not useful for their metrics.
150    pub fn disable_label_root(mut self, disable: bool) -> Self {
151        self.disable_label_root = disable;
152        self
153    }
154
155    /// Register the metrics into the registry and return a [`FastmetricsLayer`].
156    ///
157    /// # Examples
158    ///
159    /// ```no_run
160    /// # use log::debug;
161    /// # use opendal::layers::FastmetricsLayer;
162    /// # use opendal::services;
163    /// # use opendal::Operator;
164    /// # use opendal::Result;
165    ///
166    /// # #[tokio::main]
167    /// # async fn main() -> Result<()> {
168    /// let mut registry = fastmetrics::registry::Registry::default();
169    ///
170    /// // Pick a builder and configure it.
171    /// let builder = services::Memory::default();
172    /// let _ = Operator::new(builder)?
173    ///     .layer(FastmetricsLayer::builder().register(&mut registry)?)
174    ///     .finish();
175    /// # Ok(())
176    /// # }
177    /// ```
178    pub fn register(self, registry: &mut Registry) -> Result<FastmetricsLayer> {
179        let operation_bytes = Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
180            buckets: self.bytes_buckets.clone(),
181        });
182        let operation_bytes_rate = Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
183            buckets: self.bytes_rate_buckets.clone(),
184        });
185        let operation_entries = Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
186            buckets: self.entries_buckets.clone(),
187        });
188        let operation_entries_rate =
189            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
190                buckets: self.entries_rate_buckets.clone(),
191            });
192        let operation_duration_seconds =
193            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
194                buckets: self.duration_seconds_buckets.clone(),
195            });
196        let operation_errors_total = Family::<OperationLabels, Counter>::default();
197        let operation_executing = Family::<OperationLabels, Gauge>::default();
198        let operation_ttfb_seconds =
199            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
200                buckets: self.ttfb_buckets.clone(),
201            });
202
203        let http_executing = Family::<OperationLabels, Gauge>::default();
204        let http_request_bytes = Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
205            buckets: self.bytes_buckets.clone(),
206        });
207        let http_request_bytes_rate =
208            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
209                buckets: self.bytes_rate_buckets.clone(),
210            });
211        let http_request_duration_seconds =
212            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
213                buckets: self.duration_seconds_buckets.clone(),
214            });
215        let http_response_bytes = Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
216            buckets: self.bytes_buckets.clone(),
217        });
218        let http_response_bytes_rate =
219            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
220                buckets: self.bytes_rate_buckets.clone(),
221            });
222        let http_response_duration_seconds =
223            Family::<OperationLabels, Histogram, _>::new(HistogramFactory {
224                buckets: self.duration_seconds_buckets.clone(),
225            });
226        let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
227        let http_status_errors_total = Family::<OperationLabels, Counter>::default();
228
229        let interceptor = FastmetricsInterceptor {
230            operation_bytes,
231            operation_bytes_rate,
232            operation_entries,
233            operation_entries_rate,
234            operation_duration_seconds,
235            operation_errors_total,
236            operation_executing,
237            operation_ttfb_seconds,
238
239            http_executing,
240            http_request_bytes,
241            http_request_bytes_rate,
242            http_request_duration_seconds,
243            http_response_bytes,
244            http_response_bytes_rate,
245            http_response_duration_seconds,
246            http_connection_errors_total,
247            http_status_errors_total,
248
249            disable_label_root: self.disable_label_root,
250        };
251        interceptor
252            .register(registry)
253            .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err))?;
254
255        Ok(FastmetricsLayer { interceptor })
256    }
257
258    /// Register the metrics into the global registry and return a [`FastmetricsLayer`].
259    ///
260    /// # Examples
261    ///
262    /// ```no_run
263    /// # use opendal::layers::FastmetricsLayer;
264    /// # use opendal::services;
265    /// # use opendal::Operator;
266    /// # use opendal::Result;
267    ///
268    /// # fn main() -> Result<()> {
269    /// // Pick a builder and configure it.
270    /// let builder = services::Memory::default();
271    /// let _ = Operator::new(builder)?
272    ///     .layer(FastmetricsLayer::builder().register_global()?)
273    ///     .finish();
274    /// # Ok(())
275    /// # }
276    /// ```
277    pub fn register_global(self) -> Result<FastmetricsLayer> {
278        with_global_registry_mut(|registry| self.register(registry))
279    }
280}
281
282#[derive(Clone)]
283struct HistogramFactory {
284    buckets: Vec<f64>,
285}
286
287impl MetricFactory<Histogram> for HistogramFactory {
288    fn new_metric(&self) -> Histogram {
289        Histogram::new(self.buckets.iter().cloned())
290    }
291}
292
293#[derive(Clone, Debug)]
294pub struct FastmetricsInterceptor {
295    operation_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
296    operation_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
297    operation_entries: Family<OperationLabels, Histogram, HistogramFactory>,
298    operation_entries_rate: Family<OperationLabels, Histogram, HistogramFactory>,
299    operation_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
300    operation_errors_total: Family<OperationLabels, Counter>,
301    operation_executing: Family<OperationLabels, Gauge>,
302    operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
303
304    http_executing: Family<OperationLabels, Gauge>,
305    http_request_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
306    http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
307    http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
308    http_response_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
309    http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
310    http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
311    http_connection_errors_total: Family<OperationLabels, Counter>,
312    http_status_errors_total: Family<OperationLabels, Counter>,
313
314    disable_label_root: bool,
315}
316
317impl Register for FastmetricsInterceptor {
318    fn register(&self, registry: &mut Registry) -> Result<(), RegistryError> {
319        macro_rules! register_metrics {
320            ($($field:ident => $value:expr),* $(,)?) => {
321                $(
322                    {
323                        let ((name, unit), help) = ($value.name_with_unit(), $value.help());
324                        registry.register_metric(name, help, unit, self.$field.clone())?;
325                    }
326                )*
327            };
328        }
329
330        register_metrics! {
331            // Operation metrics
332            operation_bytes => observe::MetricValue::OperationBytes(0),
333            operation_bytes_rate => observe::MetricValue::OperationBytesRate(0.0),
334            operation_entries => observe::MetricValue::OperationEntries(0),
335            operation_entries_rate => observe::MetricValue::OperationEntriesRate(0.0),
336            operation_duration_seconds => observe::MetricValue::OperationDurationSeconds(Duration::default()),
337            operation_errors_total => observe::MetricValue::OperationErrorsTotal,
338            operation_executing => observe::MetricValue::OperationExecuting(0),
339            operation_ttfb_seconds => observe::MetricValue::OperationTtfbSeconds(Duration::default()),
340
341            // HTTP metrics
342            http_executing => observe::MetricValue::HttpExecuting(0),
343            http_request_bytes => observe::MetricValue::HttpRequestBytes(0),
344            http_request_bytes_rate => observe::MetricValue::HttpRequestBytesRate(0.0),
345            http_request_duration_seconds => observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
346            http_response_bytes => observe::MetricValue::HttpResponseBytes(0),
347            http_response_bytes_rate => observe::MetricValue::HttpResponseBytesRate(0.0),
348            http_response_duration_seconds => observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
349            http_connection_errors_total => observe::MetricValue::HttpConnectionErrorsTotal,
350            http_status_errors_total => observe::MetricValue::HttpStatusErrorsTotal,
351        }
352
353        Ok(())
354    }
355}
356
357impl observe::MetricsIntercept for FastmetricsInterceptor {
358    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
359        let labels = OperationLabels {
360            labels,
361            disable_label_root: self.disable_label_root,
362        };
363        match value {
364            observe::MetricValue::OperationBytes(v) => {
365                self.operation_bytes
366                    .with_or_new(&labels, |hist| hist.observe(v as f64));
367            }
368            observe::MetricValue::OperationBytesRate(v) => {
369                self.operation_bytes_rate
370                    .with_or_new(&labels, |hist| hist.observe(v));
371            }
372            observe::MetricValue::OperationEntries(v) => {
373                self.operation_entries
374                    .with_or_new(&labels, |hist| hist.observe(v as f64));
375            }
376            observe::MetricValue::OperationEntriesRate(v) => {
377                self.operation_entries_rate
378                    .with_or_new(&labels, |hist| hist.observe(v));
379            }
380            observe::MetricValue::OperationDurationSeconds(v) => {
381                self.operation_duration_seconds
382                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
383            }
384            observe::MetricValue::OperationErrorsTotal => {
385                self.operation_errors_total
386                    .with_or_new(&labels, |counter| counter.inc());
387            }
388            observe::MetricValue::OperationExecuting(v) => {
389                self.operation_executing
390                    .with_or_new(&labels, |gauge| gauge.inc_by(v as i64));
391            }
392            observe::MetricValue::OperationTtfbSeconds(v) => {
393                self.operation_ttfb_seconds
394                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
395            }
396
397            observe::MetricValue::HttpExecuting(v) => {
398                self.http_executing
399                    .with_or_new(&labels, |gauge| gauge.inc_by(v as i64));
400            }
401            observe::MetricValue::HttpRequestBytes(v) => {
402                self.http_request_bytes
403                    .with_or_new(&labels, |hist| hist.observe(v as f64));
404            }
405            observe::MetricValue::HttpRequestBytesRate(v) => {
406                self.http_request_bytes_rate
407                    .with_or_new(&labels, |hist| hist.observe(v));
408            }
409            observe::MetricValue::HttpRequestDurationSeconds(v) => {
410                self.http_request_duration_seconds
411                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
412            }
413            observe::MetricValue::HttpResponseBytes(v) => {
414                self.http_response_bytes
415                    .with_or_new(&labels, |hist| hist.observe(v as f64));
416            }
417            observe::MetricValue::HttpResponseBytesRate(v) => {
418                self.http_response_bytes_rate
419                    .with_or_new(&labels, |hist| hist.observe(v));
420            }
421            observe::MetricValue::HttpResponseDurationSeconds(v) => {
422                self.http_response_duration_seconds
423                    .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
424            }
425            observe::MetricValue::HttpConnectionErrorsTotal => {
426                self.http_connection_errors_total
427                    .with_or_new(&labels, |counter| counter.inc());
428            }
429            observe::MetricValue::HttpStatusErrorsTotal => {
430                self.http_status_errors_total
431                    .with_or_new(&labels, |counter| counter.inc());
432            }
433        };
434    }
435}
436
437#[derive(Clone, Debug, PartialEq, Eq, Hash)]
438struct OperationLabels {
439    labels: observe::MetricLabels,
440    disable_label_root: bool,
441}
442
443impl EncodeLabelSet for OperationLabels {
444    fn encode(&self, encoder: &mut dyn LabelSetEncoder) -> fmt::Result {
445        encoder.encode(&(observe::LABEL_SCHEME, self.labels.scheme.into_static()))?;
446        encoder.encode(&(observe::LABEL_NAMESPACE, self.labels.namespace.as_ref()))?;
447        if !self.disable_label_root {
448            encoder.encode(&(observe::LABEL_ROOT, self.labels.root.as_ref()))?;
449        }
450        encoder.encode(&(observe::LABEL_OPERATION, self.labels.operation))?;
451        if let Some(error) = &self.labels.error {
452            encoder.encode(&(observe::LABEL_ERROR, error.into_static()))?;
453        }
454        if let Some(code) = &self.labels.status_code {
455            encoder.encode(&(observe::LABEL_STATUS_CODE, code.as_str()))?;
456        }
457        Ok(())
458    }
459}