opendal_core/layers/
otelmetrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use opentelemetry::KeyValue;
19use opentelemetry::metrics::Counter;
20use opentelemetry::metrics::Histogram;
21use opentelemetry::metrics::Meter;
22use opentelemetry::metrics::UpDownCounter;
23
24use crate::layers::observe;
25use crate::raw::*;
26
27/// Add [opentelemetry::metrics](https://docs.rs/opentelemetry/latest/opentelemetry/metrics/index.html) for every operation.
28///
29/// # Examples
30///
31/// ```no_run
32/// # use opendal_core::layers::OtelMetricsLayer;
33/// # use opendal_core::services;
34/// # use opendal_core::Operator;
35/// # use opendal_core::Result;
36///
37/// # fn main() -> Result<()> {
38/// let meter = opentelemetry::global::meter("opendal");
39/// let _ = Operator::new(services::Memory::default())?
40///     .layer(OtelMetricsLayer::builder().register(&meter))
41///     .finish();
42/// Ok(())
43/// # }
44/// ```
45#[derive(Clone, Debug)]
46pub struct OtelMetricsLayer {
47    interceptor: OtelMetricsInterceptor,
48}
49
50impl OtelMetricsLayer {
51    /// Create a [`OtelMetricsLayerBuilder`] to set the configuration of metrics.
52    ///
53    /// # Examples
54    ///
55    /// ```no_run
56    /// # use opendal_core::layers::OtelMetricsLayer;
57    /// # use opendal_core::services;
58    /// # use opendal_core::Operator;
59    /// # use opendal_core::Result;
60    ///
61    /// # #[tokio::main]
62    /// # async fn main() -> Result<()> {
63    /// let meter = opentelemetry::global::meter("opendal");
64    /// let op = Operator::new(services::Memory::default())?
65    ///     .layer(OtelMetricsLayer::builder().register(&meter))
66    ///     .finish();
67    ///
68    /// Ok(())
69    /// # }
70    /// ```
71    pub fn builder() -> OtelMetricsLayerBuilder {
72        OtelMetricsLayerBuilder::default()
73    }
74}
75
76/// [`OtelMetricsLayerBuilder`] is a config builder to build a [`OtelMetricsLayer`].
77pub struct OtelMetricsLayerBuilder {
78    bytes_boundaries: Vec<f64>,
79    bytes_rate_boundaries: Vec<f64>,
80    entries_boundaries: Vec<f64>,
81    entries_rate_boundaries: Vec<f64>,
82    duration_seconds_boundaries: Vec<f64>,
83    ttfb_boundaries: Vec<f64>,
84}
85
86impl Default for OtelMetricsLayerBuilder {
87    fn default() -> Self {
88        Self {
89            bytes_boundaries: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
90            bytes_rate_boundaries: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
91            entries_boundaries: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
92            entries_rate_boundaries: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
93            duration_seconds_boundaries: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
94            ttfb_boundaries: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
95        }
96    }
97}
98
99impl OtelMetricsLayerBuilder {
100    /// Set boundaries for bytes related histogram like `operation_bytes`.
101    pub fn bytes_boundaries(mut self, boundaries: Vec<f64>) -> Self {
102        if !boundaries.is_empty() {
103            self.bytes_boundaries = boundaries;
104        }
105        self
106    }
107
108    /// Set boundaries for bytes rate related histogram like `operation_bytes_rate`.
109    pub fn bytes_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
110        if !boundaries.is_empty() {
111            self.bytes_rate_boundaries = boundaries;
112        }
113        self
114    }
115
116    /// Set boundaries for entries related histogram like `operation_entries`.
117    pub fn entries_boundaries(mut self, boundaries: Vec<f64>) -> Self {
118        if !boundaries.is_empty() {
119            self.entries_boundaries = boundaries;
120        }
121        self
122    }
123
124    /// Set boundaries for entries rate related histogram like `operation_entries_rate`.
125    pub fn entries_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
126        if !boundaries.is_empty() {
127            self.entries_rate_boundaries = boundaries;
128        }
129        self
130    }
131
132    /// Set boundaries for duration seconds related histogram like `operation_duration_seconds`.
133    pub fn duration_seconds_boundaries(mut self, boundaries: Vec<f64>) -> Self {
134        if !boundaries.is_empty() {
135            self.duration_seconds_boundaries = boundaries;
136        }
137        self
138    }
139
140    /// Set boundaries for ttfb related histogram like `operation_ttfb_seconds`.
141    pub fn ttfb_boundaries(mut self, boundaries: Vec<f64>) -> Self {
142        if !boundaries.is_empty() {
143            self.ttfb_boundaries = boundaries;
144        }
145        self
146    }
147
148    /// Register the metrics and return a [`OtelMetricsLayer`].
149    ///
150    /// # Examples
151    ///
152    /// ```no_run
153    /// # use opendal_core::layers::OtelMetricsLayer;
154    /// # use opendal_core::services;
155    /// # use opendal_core::Operator;
156    /// # use opendal_core::Result;
157    ///
158    /// # #[tokio::main]
159    /// # async fn main() -> Result<()> {
160    /// let meter = opentelemetry::global::meter("opendal");
161    /// let op = Operator::new(services::Memory::default())?
162    ///     .layer(OtelMetricsLayer::builder().register(&meter))
163    ///     .finish();
164    ///
165    /// Ok(())
166    /// # }
167    /// ```
168    pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
169        let operation_bytes = {
170            let metric = observe::MetricValue::OperationBytes(0);
171            register_u64_histogram_meter(
172                meter,
173                "opendal.operation.bytes",
174                metric,
175                self.bytes_boundaries.clone(),
176            )
177        };
178        let operation_bytes_rate = {
179            let metric = observe::MetricValue::OperationBytesRate(0.0);
180            register_f64_histogram_meter(
181                meter,
182                "opendal.operation.bytes_rate",
183                metric,
184                self.bytes_rate_boundaries.clone(),
185            )
186        };
187        let operation_entries = {
188            let metric = observe::MetricValue::OperationEntries(0);
189            register_u64_histogram_meter(
190                meter,
191                "opendal.operation.entries",
192                metric,
193                self.entries_boundaries.clone(),
194            )
195        };
196        let operation_entries_rate = {
197            let metric = observe::MetricValue::OperationEntriesRate(0.0);
198            register_f64_histogram_meter(
199                meter,
200                "opendal.operation.entries_rate",
201                metric,
202                self.entries_rate_boundaries.clone(),
203            )
204        };
205        let operation_duration_seconds = {
206            let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
207            register_f64_histogram_meter(
208                meter,
209                "opendal.operation.duration",
210                metric,
211                self.duration_seconds_boundaries.clone(),
212            )
213        };
214        let operation_errors_total = {
215            let metric = observe::MetricValue::OperationErrorsTotal;
216            meter
217                .u64_counter("opendal.operation.errors")
218                .with_description(metric.help())
219                .build()
220        };
221        let operation_executing = {
222            let metric = observe::MetricValue::OperationExecuting(0);
223            meter
224                .i64_up_down_counter("opendal.operation.executing")
225                .with_description(metric.help())
226                .build()
227        };
228        let operation_ttfb_seconds = {
229            let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
230            register_f64_histogram_meter(
231                meter,
232                "opendal.operation.ttfb",
233                metric,
234                self.duration_seconds_boundaries.clone(),
235            )
236        };
237
238        let http_executing = {
239            let metric = observe::MetricValue::HttpExecuting(0);
240            meter
241                .i64_up_down_counter("opendal.http.executing")
242                .with_description(metric.help())
243                .build()
244        };
245        let http_request_bytes = {
246            let metric = observe::MetricValue::HttpRequestBytes(0);
247            register_u64_histogram_meter(
248                meter,
249                "opendal.http.request.bytes",
250                metric,
251                self.bytes_boundaries.clone(),
252            )
253        };
254        let http_request_bytes_rate = {
255            let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
256            register_f64_histogram_meter(
257                meter,
258                "opendal.http.request.bytes_rate",
259                metric,
260                self.bytes_rate_boundaries.clone(),
261            )
262        };
263        let http_request_duration_seconds = {
264            let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
265            register_f64_histogram_meter(
266                meter,
267                "opendal.http.request.duration",
268                metric,
269                self.duration_seconds_boundaries.clone(),
270            )
271        };
272        let http_response_bytes = {
273            let metric = observe::MetricValue::HttpResponseBytes(0);
274            register_u64_histogram_meter(
275                meter,
276                "opendal.http.response.bytes",
277                metric,
278                self.bytes_boundaries.clone(),
279            )
280        };
281        let http_response_bytes_rate = {
282            let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
283            register_f64_histogram_meter(
284                meter,
285                "opendal.http.response.bytes_rate",
286                metric,
287                self.bytes_rate_boundaries.clone(),
288            )
289        };
290        let http_response_duration_seconds = {
291            let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
292            register_f64_histogram_meter(
293                meter,
294                "opendal.http.response.duration",
295                metric,
296                self.duration_seconds_boundaries.clone(),
297            )
298        };
299        let http_connection_errors_total = {
300            let metric = observe::MetricValue::HttpConnectionErrorsTotal;
301            meter
302                .u64_counter("opendal.http.connection_errors")
303                .with_description(metric.help())
304                .build()
305        };
306        let http_status_errors_total = {
307            let metric = observe::MetricValue::HttpStatusErrorsTotal;
308            meter
309                .u64_counter("opendal.http.status_errors")
310                .with_description(metric.help())
311                .build()
312        };
313
314        OtelMetricsLayer {
315            interceptor: OtelMetricsInterceptor {
316                operation_bytes,
317                operation_bytes_rate,
318                operation_entries,
319                operation_entries_rate,
320                operation_duration_seconds,
321                operation_errors_total,
322                operation_executing,
323                operation_ttfb_seconds,
324
325                http_executing,
326                http_request_bytes,
327                http_request_bytes_rate,
328                http_request_duration_seconds,
329                http_response_bytes,
330                http_response_bytes_rate,
331                http_response_duration_seconds,
332                http_connection_errors_total,
333                http_status_errors_total,
334            },
335        }
336    }
337}
338
339impl<A: Access> Layer<A> for OtelMetricsLayer {
340    type LayeredAccess = observe::MetricsAccessor<A, OtelMetricsInterceptor>;
341
342    fn layer(&self, inner: A) -> Self::LayeredAccess {
343        observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
344    }
345}
346
347#[derive(Clone, Debug)]
348pub struct OtelMetricsInterceptor {
349    operation_bytes: Histogram<u64>,
350    operation_bytes_rate: Histogram<f64>,
351    operation_entries: Histogram<u64>,
352    operation_entries_rate: Histogram<f64>,
353    operation_duration_seconds: Histogram<f64>,
354    operation_errors_total: Counter<u64>,
355    operation_executing: UpDownCounter<i64>,
356    operation_ttfb_seconds: Histogram<f64>,
357
358    http_executing: UpDownCounter<i64>,
359    http_request_bytes: Histogram<u64>,
360    http_request_bytes_rate: Histogram<f64>,
361    http_request_duration_seconds: Histogram<f64>,
362    http_response_bytes: Histogram<u64>,
363    http_response_bytes_rate: Histogram<f64>,
364    http_response_duration_seconds: Histogram<f64>,
365    http_connection_errors_total: Counter<u64>,
366    http_status_errors_total: Counter<u64>,
367}
368
369impl observe::MetricsIntercept for OtelMetricsInterceptor {
370    fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
371        let attributes = self.create_attributes(labels);
372
373        match value {
374            observe::MetricValue::OperationBytes(v) => self.operation_bytes.record(v, &attributes),
375            observe::MetricValue::OperationBytesRate(v) => {
376                self.operation_bytes_rate.record(v, &attributes)
377            }
378            observe::MetricValue::OperationEntries(v) => {
379                self.operation_entries.record(v, &attributes)
380            }
381            observe::MetricValue::OperationEntriesRate(v) => {
382                self.operation_entries_rate.record(v, &attributes)
383            }
384            observe::MetricValue::OperationDurationSeconds(v) => self
385                .operation_duration_seconds
386                .record(v.as_secs_f64(), &attributes),
387            observe::MetricValue::OperationErrorsTotal => {
388                self.operation_errors_total.add(1, &attributes)
389            }
390            observe::MetricValue::OperationExecuting(v) => {
391                self.operation_executing.add(v as i64, &attributes)
392            }
393            observe::MetricValue::OperationTtfbSeconds(v) => self
394                .operation_ttfb_seconds
395                .record(v.as_secs_f64(), &attributes),
396
397            observe::MetricValue::HttpExecuting(v) => {
398                self.http_executing.add(v as i64, &attributes)
399            }
400            observe::MetricValue::HttpRequestBytes(v) => {
401                self.http_request_bytes.record(v, &attributes)
402            }
403            observe::MetricValue::HttpRequestBytesRate(v) => {
404                self.http_request_bytes_rate.record(v, &attributes)
405            }
406            observe::MetricValue::HttpRequestDurationSeconds(v) => self
407                .http_request_duration_seconds
408                .record(v.as_secs_f64(), &attributes),
409            observe::MetricValue::HttpResponseBytes(v) => {
410                self.http_response_bytes.record(v, &attributes)
411            }
412            observe::MetricValue::HttpResponseBytesRate(v) => {
413                self.http_response_bytes_rate.record(v, &attributes)
414            }
415            observe::MetricValue::HttpResponseDurationSeconds(v) => self
416                .http_response_duration_seconds
417                .record(v.as_secs_f64(), &attributes),
418            observe::MetricValue::HttpConnectionErrorsTotal => {
419                self.http_connection_errors_total.add(1, &attributes)
420            }
421            observe::MetricValue::HttpStatusErrorsTotal => {
422                self.http_status_errors_total.add(1, &attributes)
423            }
424        }
425    }
426}
427
428impl OtelMetricsInterceptor {
429    fn create_attributes(&self, attrs: observe::MetricLabels) -> Vec<KeyValue> {
430        let mut attributes = Vec::with_capacity(6);
431
432        attributes.extend([
433            KeyValue::new(observe::LABEL_SCHEME, attrs.scheme),
434            KeyValue::new(observe::LABEL_NAMESPACE, attrs.namespace),
435            KeyValue::new(observe::LABEL_ROOT, attrs.root),
436            KeyValue::new(observe::LABEL_OPERATION, attrs.operation),
437        ]);
438
439        if let Some(error) = attrs.error {
440            attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static()));
441        }
442
443        if let Some(status_code) = attrs.status_code {
444            attributes.push(KeyValue::new(
445                observe::LABEL_STATUS_CODE,
446                status_code.as_u16() as i64,
447            ));
448        }
449
450        attributes
451    }
452}
453
454fn register_u64_histogram_meter(
455    meter: &Meter,
456    name: &'static str,
457    metric: observe::MetricValue,
458    boundaries: Vec<f64>,
459) -> Histogram<u64> {
460    let (_name, unit) = metric.name_with_unit();
461    let description = metric.help();
462
463    let builder = meter
464        .u64_histogram(name)
465        .with_description(description)
466        .with_boundaries(boundaries);
467
468    if let Some(unit) = unit {
469        builder.with_unit(unit).build()
470    } else {
471        builder.build()
472    }
473}
474
475fn register_f64_histogram_meter(
476    meter: &Meter,
477    name: &'static str,
478    metric: observe::MetricValue,
479    boundaries: Vec<f64>,
480) -> Histogram<f64> {
481    let (_name, unit) = metric.name_with_unit();
482    let description = metric.help();
483
484    let builder = meter
485        .f64_histogram(name)
486        .with_description(description)
487        .with_boundaries(boundaries);
488
489    if let Some(unit) = unit {
490        builder.with_unit(unit).build()
491    } else {
492        builder.build()
493    }
494}