opendal/layers/
otelmetrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::time::Duration;
19
20use opentelemetry::metrics::Counter;
21use opentelemetry::metrics::Histogram;
22use opentelemetry::metrics::Meter;
23use opentelemetry::metrics::UpDownCounter;
24use opentelemetry::KeyValue;
25
26use crate::layers::observe;
27use crate::raw::*;
28
29/// Add [opentelemetry::metrics](https://docs.rs/opentelemetry/latest/opentelemetry/metrics/index.html) for every operation.
30///
31/// # Examples
32///
33/// ```no_run
34/// # use opendal::layers::OtelMetricsLayer;
35/// # use opendal::services;
36/// # use opendal::Operator;
37/// # use opendal::Result;
38///
39/// # fn main() -> Result<()> {
40/// let meter = opentelemetry::global::meter("opendal");
41/// let _ = Operator::new(services::Memory::default())?
42///     .layer(OtelMetricsLayer::builder().register(&meter))
43///     .finish();
44/// Ok(())
45/// # }
46/// ```
47#[derive(Clone, Debug)]
48pub struct OtelMetricsLayer {
49    interceptor: OtelMetricsInterceptor,
50}
51
52impl OtelMetricsLayer {
53    /// Create a [`OtelMetricsLayerBuilder`] to set the configuration of metrics.
54    ///
55    /// # Examples
56    ///
57    /// ```no_run
58    /// # use opendal::layers::OtelMetricsLayer;
59    /// # use opendal::services;
60    /// # use opendal::Operator;
61    /// # use opendal::Result;
62    ///
63    /// # #[tokio::main]
64    /// # async fn main() -> Result<()> {
65    /// let meter = opentelemetry::global::meter("opendal");
66    /// let op = Operator::new(services::Memory::default())?
67    ///     .layer(OtelMetricsLayer::builder().register(&meter))
68    ///     .finish();
69    ///
70    /// Ok(())
71    /// # }
72    /// ```
73    pub fn builder() -> OtelMetricsLayerBuilder {
74        OtelMetricsLayerBuilder::default()
75    }
76}
77
78/// [`OtelMetricsLayerBuilder`] is a config builder to build a [`OtelMetricsLayer`].
79pub struct OtelMetricsLayerBuilder {
80    bytes_boundaries: Vec<f64>,
81    bytes_rate_boundaries: Vec<f64>,
82    entries_boundaries: Vec<f64>,
83    entries_rate_boundaries: Vec<f64>,
84    duration_seconds_boundaries: Vec<f64>,
85    ttfb_boundaries: Vec<f64>,
86}
87
88impl Default for OtelMetricsLayerBuilder {
89    fn default() -> Self {
90        Self {
91            bytes_boundaries: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
92            bytes_rate_boundaries: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
93            entries_boundaries: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
94            entries_rate_boundaries: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
95            duration_seconds_boundaries: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
96            ttfb_boundaries: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
97        }
98    }
99}
100
101impl OtelMetricsLayerBuilder {
102    /// Set boundaries for bytes related histogram like `operation_bytes`.
103    pub fn bytes_boundaries(mut self, boundaries: Vec<f64>) -> Self {
104        if !boundaries.is_empty() {
105            self.bytes_boundaries = boundaries;
106        }
107        self
108    }
109
110    /// Set boundaries for bytes rate related histogram like `operation_bytes_rate`.
111    pub fn bytes_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
112        if !boundaries.is_empty() {
113            self.bytes_rate_boundaries = boundaries;
114        }
115        self
116    }
117
118    /// Set boundaries for entries related histogram like `operation_entries`.
119    pub fn entries_boundaries(mut self, boundaries: Vec<f64>) -> Self {
120        if !boundaries.is_empty() {
121            self.entries_boundaries = boundaries;
122        }
123        self
124    }
125
126    /// Set boundaries for entries rate related histogram like `operation_entries_rate`.
127    pub fn entries_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
128        if !boundaries.is_empty() {
129            self.entries_rate_boundaries = boundaries;
130        }
131        self
132    }
133
134    /// Set boundaries for duration seconds related histogram like `operation_duration_seconds`.
135    pub fn duration_seconds_boundaries(mut self, boundaries: Vec<f64>) -> Self {
136        if !boundaries.is_empty() {
137            self.duration_seconds_boundaries = boundaries;
138        }
139        self
140    }
141
142    /// Set boundaries for ttfb related histogram like `operation_ttfb_seconds`.
143    pub fn ttfb_boundaries(mut self, boundaries: Vec<f64>) -> Self {
144        if !boundaries.is_empty() {
145            self.ttfb_boundaries = boundaries;
146        }
147        self
148    }
149
150    /// Register the metrics and return a [`OtelMetricsLayer`].
151    ///
152    /// # Examples
153    ///
154    /// ```no_run
155    /// # use opendal::layers::OtelMetricsLayer;
156    /// # use opendal::services;
157    /// # use opendal::Operator;
158    /// # use opendal::Result;
159    ///
160    /// # #[tokio::main]
161    /// # async fn main() -> Result<()> {
162    /// let meter = opentelemetry::global::meter("opendal");
163    /// let op = Operator::new(services::Memory::default())?
164    ///     .layer(OtelMetricsLayer::builder()
165    ///     .register(&meter))
166    ///     .finish();
167    ///
168    /// Ok(())
169    /// # }
170    /// ```
171    pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
172        let operation_bytes = {
173            let metric = observe::MetricValue::OperationBytes(0);
174            register_u64_histogram_meter(
175                meter,
176                "opendal.operation.bytes",
177                metric,
178                self.bytes_boundaries.clone(),
179            )
180        };
181        let operation_bytes_rate = {
182            let metric = observe::MetricValue::OperationBytesRate(0.0);
183            register_f64_histogram_meter(
184                meter,
185                "opendal.operation.bytes_rate",
186                metric,
187                self.bytes_rate_boundaries.clone(),
188            )
189        };
190        let operation_entries = {
191            let metric = observe::MetricValue::OperationEntries(0);
192            register_u64_histogram_meter(
193                meter,
194                "opendal.operation.entries",
195                metric,
196                self.entries_boundaries.clone(),
197            )
198        };
199        let operation_entries_rate = {
200            let metric = observe::MetricValue::OperationEntriesRate(0.0);
201            register_f64_histogram_meter(
202                meter,
203                "opendal.operation.entries_rate",
204                metric,
205                self.entries_rate_boundaries.clone(),
206            )
207        };
208        let operation_duration_seconds = {
209            let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
210            register_f64_histogram_meter(
211                meter,
212                "opendal.operation.duration",
213                metric,
214                self.duration_seconds_boundaries.clone(),
215            )
216        };
217        let operation_errors_total = {
218            let metric = observe::MetricValue::OperationErrorsTotal;
219            meter
220                .u64_counter("opendal.operation.errors")
221                .with_description(metric.help())
222                .build()
223        };
224        let operation_executing = {
225            let metric = observe::MetricValue::OperationExecuting(0);
226            meter
227                .i64_up_down_counter("opendal.operation.executing")
228                .with_description(metric.help())
229                .build()
230        };
231        let operation_ttfb_seconds = {
232            let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
233            register_f64_histogram_meter(
234                meter,
235                "opendal.operation.ttfb",
236                metric,
237                self.duration_seconds_boundaries.clone(),
238            )
239        };
240
241        let http_executing = {
242            let metric = observe::MetricValue::HttpExecuting(0);
243            meter
244                .i64_up_down_counter("opendal.http.executing")
245                .with_description(metric.help())
246                .build()
247        };
248        let http_request_bytes = {
249            let metric = observe::MetricValue::HttpRequestBytes(0);
250            register_u64_histogram_meter(
251                meter,
252                "opendal.http.request.bytes",
253                metric,
254                self.bytes_boundaries.clone(),
255            )
256        };
257        let http_request_bytes_rate = {
258            let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
259            register_f64_histogram_meter(
260                meter,
261                "opendal.http.request.bytes_rate",
262                metric,
263                self.bytes_rate_boundaries.clone(),
264            )
265        };
266        let http_request_duration_seconds = {
267            let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
268            register_f64_histogram_meter(
269                meter,
270                "opendal.http.request.duration",
271                metric,
272                self.duration_seconds_boundaries.clone(),
273            )
274        };
275        let http_response_bytes = {
276            let metric = observe::MetricValue::HttpResponseBytes(0);
277            register_u64_histogram_meter(
278                meter,
279                "opendal.http.response.bytes",
280                metric,
281                self.bytes_boundaries.clone(),
282            )
283        };
284        let http_response_bytes_rate = {
285            let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
286            register_f64_histogram_meter(
287                meter,
288                "opendal.http.response.bytes_rate",
289                metric,
290                self.bytes_rate_boundaries.clone(),
291            )
292        };
293        let http_response_duration_seconds = {
294            let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
295            register_f64_histogram_meter(
296                meter,
297                "opendal.http.response.duration",
298                metric,
299                self.duration_seconds_boundaries.clone(),
300            )
301        };
302        let http_connection_errors_total = {
303            let metric = observe::MetricValue::HttpConnectionErrorsTotal;
304            meter
305                .u64_counter("opendal.http.connection_errors")
306                .with_description(metric.help())
307                .build()
308        };
309        let http_status_errors_total = {
310            let metric = observe::MetricValue::HttpStatusErrorsTotal;
311            meter
312                .u64_counter("opendal.http.status_errors")
313                .with_description(metric.help())
314                .build()
315        };
316
317        OtelMetricsLayer {
318            interceptor: OtelMetricsInterceptor {
319                operation_bytes,
320                operation_bytes_rate,
321                operation_entries,
322                operation_entries_rate,
323                operation_duration_seconds,
324                operation_errors_total,
325                operation_executing,
326                operation_ttfb_seconds,
327
328                http_executing,
329                http_request_bytes,
330                http_request_bytes_rate,
331                http_request_duration_seconds,
332                http_response_bytes,
333                http_response_bytes_rate,
334                http_response_duration_seconds,
335                http_connection_errors_total,
336                http_status_errors_total,
337            },
338        }
339    }
340}
341
342impl<A: Access> Layer<A> for OtelMetricsLayer {
343    type LayeredAccess = observe::MetricsAccessor<A, OtelMetricsInterceptor>;
344
345    fn layer(&self, inner: A) -> Self::LayeredAccess {
346        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
347    }
348}
349
350#[derive(Clone, Debug)]
351pub struct OtelMetricsInterceptor {
352    operation_bytes: Histogram<u64>,
353    operation_bytes_rate: Histogram<f64>,
354    operation_entries: Histogram<u64>,
355    operation_entries_rate: Histogram<f64>,
356    operation_duration_seconds: Histogram<f64>,
357    operation_errors_total: Counter<u64>,
358    operation_executing: UpDownCounter<i64>,
359    operation_ttfb_seconds: Histogram<f64>,
360
361    http_executing: UpDownCounter<i64>,
362    http_request_bytes: Histogram<u64>,
363    http_request_bytes_rate: Histogram<f64>,
364    http_request_duration_seconds: Histogram<f64>,
365    http_response_bytes: Histogram<u64>,
366    http_response_bytes_rate: Histogram<f64>,
367    http_response_duration_seconds: Histogram<f64>,
368    http_connection_errors_total: Counter<u64>,
369    http_status_errors_total: Counter<u64>,
370}
371
372impl observe::MetricsIntercept for OtelMetricsInterceptor {
373    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
374        let attributes = self.create_attributes(labels);
375
376        match value {
377            observe::MetricValue::OperationBytes(v) => self.operation_bytes.record(v, &attributes),
378            observe::MetricValue::OperationBytesRate(v) => {
379                self.operation_bytes_rate.record(v, &attributes)
380            }
381            observe::MetricValue::OperationEntries(v) => {
382                self.operation_entries.record(v, &attributes)
383            }
384            observe::MetricValue::OperationEntriesRate(v) => {
385                self.operation_entries_rate.record(v, &attributes)
386            }
387            observe::MetricValue::OperationDurationSeconds(v) => self
388                .operation_duration_seconds
389                .record(v.as_secs_f64(), &attributes),
390            observe::MetricValue::OperationErrorsTotal => {
391                self.operation_errors_total.add(1, &attributes)
392            }
393            observe::MetricValue::OperationExecuting(v) => {
394                self.operation_executing.add(v as i64, &attributes)
395            }
396            observe::MetricValue::OperationTtfbSeconds(v) => self
397                .operation_ttfb_seconds
398                .record(v.as_secs_f64(), &attributes),
399
400            observe::MetricValue::HttpExecuting(v) => {
401                self.http_executing.add(v as i64, &attributes)
402            }
403            observe::MetricValue::HttpRequestBytes(v) => {
404                self.http_request_bytes.record(v, &attributes)
405            }
406            observe::MetricValue::HttpRequestBytesRate(v) => {
407                self.http_request_bytes_rate.record(v, &attributes)
408            }
409            observe::MetricValue::HttpRequestDurationSeconds(v) => self
410                .http_request_duration_seconds
411                .record(v.as_secs_f64(), &attributes),
412            observe::MetricValue::HttpResponseBytes(v) => {
413                self.http_response_bytes.record(v, &attributes)
414            }
415            observe::MetricValue::HttpResponseBytesRate(v) => {
416                self.http_response_bytes_rate.record(v, &attributes)
417            }
418            observe::MetricValue::HttpResponseDurationSeconds(v) => self
419                .http_response_duration_seconds
420                .record(v.as_secs_f64(), &attributes),
421            observe::MetricValue::HttpConnectionErrorsTotal => {
422                self.http_connection_errors_total.add(1, &attributes)
423            }
424            observe::MetricValue::HttpStatusErrorsTotal => {
425                self.http_status_errors_total.add(1, &attributes)
426            }
427        }
428    }
429}
430
431impl OtelMetricsInterceptor {
432    fn create_attributes(&self, attrs: observe::MetricLabels) -> Vec<KeyValue> {
433        let mut attributes = Vec::with_capacity(6);
434
435        attributes.extend([
436            KeyValue::new(observe::LABEL_SCHEME, attrs.scheme.into_static()),
437            KeyValue::new(observe::LABEL_NAMESPACE, attrs.namespace),
438            KeyValue::new(observe::LABEL_ROOT, attrs.root),
439            KeyValue::new(observe::LABEL_OPERATION, attrs.operation),
440        ]);
441
442        if let Some(error) = attrs.error {
443            attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static()));
444        }
445
446        if let Some(status_code) = attrs.status_code {
447            attributes.push(KeyValue::new(
448                observe::LABEL_STATUS_CODE,
449                status_code.as_u16() as i64,
450            ));
451        }
452
453        attributes
454    }
455}
456
457fn register_u64_histogram_meter(
458    meter: &Meter,
459    name: &'static str,
460    metric: observe::MetricValue,
461    boundaries: Vec<f64>,
462) -> Histogram<u64> {
463    let (_name, unit) = metric.name_with_unit();
464    let description = metric.help();
465
466    let builder = meter
467        .u64_histogram(name)
468        .with_description(description)
469        .with_boundaries(boundaries);
470
471    if let Some(unit) = unit {
472        builder.with_unit(unit).build()
473    } else {
474        builder.build()
475    }
476}
477
478fn register_f64_histogram_meter(
479    meter: &Meter,
480    name: &'static str,
481    metric: observe::MetricValue,
482    boundaries: Vec<f64>,
483) -> Histogram<f64> {
484    let (_name, unit) = metric.name_with_unit();
485    let description = metric.help();
486
487    let builder = meter
488        .f64_histogram(name)
489        .with_description(description)
490        .with_boundaries(boundaries);
491
492    if let Some(unit) = unit {
493        builder.with_unit(unit).build()
494    } else {
495        builder.build()
496    }
497}