1use std::time::Duration;
19
20use opentelemetry::metrics::Counter;
21use opentelemetry::metrics::Histogram;
22use opentelemetry::metrics::Meter;
23use opentelemetry::metrics::UpDownCounter;
24use opentelemetry::KeyValue;
25
26use crate::layers::observe;
27use crate::raw::*;
28
29#[derive(Clone, Debug)]
48pub struct OtelMetricsLayer {
49 interceptor: OtelMetricsInterceptor,
50}
51
52impl OtelMetricsLayer {
53 pub fn builder() -> OtelMetricsLayerBuilder {
74 OtelMetricsLayerBuilder::default()
75 }
76}
77
78pub struct OtelMetricsLayerBuilder {
80 bytes_boundaries: Vec<f64>,
81 bytes_rate_boundaries: Vec<f64>,
82 entries_boundaries: Vec<f64>,
83 entries_rate_boundaries: Vec<f64>,
84 duration_seconds_boundaries: Vec<f64>,
85 ttfb_boundaries: Vec<f64>,
86}
87
88impl Default for OtelMetricsLayerBuilder {
89 fn default() -> Self {
90 Self {
91 bytes_boundaries: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
92 bytes_rate_boundaries: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
93 entries_boundaries: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
94 entries_rate_boundaries: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
95 duration_seconds_boundaries: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
96 ttfb_boundaries: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
97 }
98 }
99}
100
101impl OtelMetricsLayerBuilder {
102 pub fn bytes_boundaries(mut self, boundaries: Vec<f64>) -> Self {
104 if !boundaries.is_empty() {
105 self.bytes_boundaries = boundaries;
106 }
107 self
108 }
109
110 pub fn bytes_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
112 if !boundaries.is_empty() {
113 self.bytes_rate_boundaries = boundaries;
114 }
115 self
116 }
117
118 pub fn entries_boundaries(mut self, boundaries: Vec<f64>) -> Self {
120 if !boundaries.is_empty() {
121 self.entries_boundaries = boundaries;
122 }
123 self
124 }
125
126 pub fn entries_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
128 if !boundaries.is_empty() {
129 self.entries_rate_boundaries = boundaries;
130 }
131 self
132 }
133
134 pub fn duration_seconds_boundaries(mut self, boundaries: Vec<f64>) -> Self {
136 if !boundaries.is_empty() {
137 self.duration_seconds_boundaries = boundaries;
138 }
139 self
140 }
141
142 pub fn ttfb_boundaries(mut self, boundaries: Vec<f64>) -> Self {
144 if !boundaries.is_empty() {
145 self.ttfb_boundaries = boundaries;
146 }
147 self
148 }
149
150 pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
172 let operation_bytes = {
173 let metric = observe::MetricValue::OperationBytes(0);
174 register_u64_histogram_meter(
175 meter,
176 "opendal.operation.bytes",
177 metric,
178 self.bytes_boundaries.clone(),
179 )
180 };
181 let operation_bytes_rate = {
182 let metric = observe::MetricValue::OperationBytesRate(0.0);
183 register_f64_histogram_meter(
184 meter,
185 "opendal.operation.bytes_rate",
186 metric,
187 self.bytes_rate_boundaries.clone(),
188 )
189 };
190 let operation_entries = {
191 let metric = observe::MetricValue::OperationEntries(0);
192 register_u64_histogram_meter(
193 meter,
194 "opendal.operation.entries",
195 metric,
196 self.entries_boundaries.clone(),
197 )
198 };
199 let operation_entries_rate = {
200 let metric = observe::MetricValue::OperationEntriesRate(0.0);
201 register_f64_histogram_meter(
202 meter,
203 "opendal.operation.entries_rate",
204 metric,
205 self.entries_rate_boundaries.clone(),
206 )
207 };
208 let operation_duration_seconds = {
209 let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
210 register_f64_histogram_meter(
211 meter,
212 "opendal.operation.duration",
213 metric,
214 self.duration_seconds_boundaries.clone(),
215 )
216 };
217 let operation_errors_total = {
218 let metric = observe::MetricValue::OperationErrorsTotal;
219 meter
220 .u64_counter("opendal.operation.errors")
221 .with_description(metric.help())
222 .build()
223 };
224 let operation_executing = {
225 let metric = observe::MetricValue::OperationExecuting(0);
226 meter
227 .i64_up_down_counter("opendal.operation.executing")
228 .with_description(metric.help())
229 .build()
230 };
231 let operation_ttfb_seconds = {
232 let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
233 register_f64_histogram_meter(
234 meter,
235 "opendal.operation.ttfb",
236 metric,
237 self.duration_seconds_boundaries.clone(),
238 )
239 };
240
241 let http_executing = {
242 let metric = observe::MetricValue::HttpExecuting(0);
243 meter
244 .i64_up_down_counter("opendal.http.executing")
245 .with_description(metric.help())
246 .build()
247 };
248 let http_request_bytes = {
249 let metric = observe::MetricValue::HttpRequestBytes(0);
250 register_u64_histogram_meter(
251 meter,
252 "opendal.http.request.bytes",
253 metric,
254 self.bytes_boundaries.clone(),
255 )
256 };
257 let http_request_bytes_rate = {
258 let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
259 register_f64_histogram_meter(
260 meter,
261 "opendal.http.request.bytes_rate",
262 metric,
263 self.bytes_rate_boundaries.clone(),
264 )
265 };
266 let http_request_duration_seconds = {
267 let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
268 register_f64_histogram_meter(
269 meter,
270 "opendal.http.request.duration",
271 metric,
272 self.duration_seconds_boundaries.clone(),
273 )
274 };
275 let http_response_bytes = {
276 let metric = observe::MetricValue::HttpResponseBytes(0);
277 register_u64_histogram_meter(
278 meter,
279 "opendal.http.response.bytes",
280 metric,
281 self.bytes_boundaries.clone(),
282 )
283 };
284 let http_response_bytes_rate = {
285 let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
286 register_f64_histogram_meter(
287 meter,
288 "opendal.http.response.bytes_rate",
289 metric,
290 self.bytes_rate_boundaries.clone(),
291 )
292 };
293 let http_response_duration_seconds = {
294 let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
295 register_f64_histogram_meter(
296 meter,
297 "opendal.http.response.duration",
298 metric,
299 self.duration_seconds_boundaries.clone(),
300 )
301 };
302 let http_connection_errors_total = {
303 let metric = observe::MetricValue::HttpConnectionErrorsTotal;
304 meter
305 .u64_counter("opendal.http.connection_errors")
306 .with_description(metric.help())
307 .build()
308 };
309 let http_status_errors_total = {
310 let metric = observe::MetricValue::HttpStatusErrorsTotal;
311 meter
312 .u64_counter("opendal.http.status_errors")
313 .with_description(metric.help())
314 .build()
315 };
316
317 OtelMetricsLayer {
318 interceptor: OtelMetricsInterceptor {
319 operation_bytes,
320 operation_bytes_rate,
321 operation_entries,
322 operation_entries_rate,
323 operation_duration_seconds,
324 operation_errors_total,
325 operation_executing,
326 operation_ttfb_seconds,
327
328 http_executing,
329 http_request_bytes,
330 http_request_bytes_rate,
331 http_request_duration_seconds,
332 http_response_bytes,
333 http_response_bytes_rate,
334 http_response_duration_seconds,
335 http_connection_errors_total,
336 http_status_errors_total,
337 },
338 }
339 }
340}
341
342impl<A: Access> Layer<A> for OtelMetricsLayer {
343 type LayeredAccess = observe::MetricsAccessor<A, OtelMetricsInterceptor>;
344
345 fn layer(&self, inner: A) -> Self::LayeredAccess {
346 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
347 }
348}
349
350#[derive(Clone, Debug)]
351pub struct OtelMetricsInterceptor {
352 operation_bytes: Histogram<u64>,
353 operation_bytes_rate: Histogram<f64>,
354 operation_entries: Histogram<u64>,
355 operation_entries_rate: Histogram<f64>,
356 operation_duration_seconds: Histogram<f64>,
357 operation_errors_total: Counter<u64>,
358 operation_executing: UpDownCounter<i64>,
359 operation_ttfb_seconds: Histogram<f64>,
360
361 http_executing: UpDownCounter<i64>,
362 http_request_bytes: Histogram<u64>,
363 http_request_bytes_rate: Histogram<f64>,
364 http_request_duration_seconds: Histogram<f64>,
365 http_response_bytes: Histogram<u64>,
366 http_response_bytes_rate: Histogram<f64>,
367 http_response_duration_seconds: Histogram<f64>,
368 http_connection_errors_total: Counter<u64>,
369 http_status_errors_total: Counter<u64>,
370}
371
372impl observe::MetricsIntercept for OtelMetricsInterceptor {
373 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
374 let attributes = self.create_attributes(labels);
375
376 match value {
377 observe::MetricValue::OperationBytes(v) => self.operation_bytes.record(v, &attributes),
378 observe::MetricValue::OperationBytesRate(v) => {
379 self.operation_bytes_rate.record(v, &attributes)
380 }
381 observe::MetricValue::OperationEntries(v) => {
382 self.operation_entries.record(v, &attributes)
383 }
384 observe::MetricValue::OperationEntriesRate(v) => {
385 self.operation_entries_rate.record(v, &attributes)
386 }
387 observe::MetricValue::OperationDurationSeconds(v) => self
388 .operation_duration_seconds
389 .record(v.as_secs_f64(), &attributes),
390 observe::MetricValue::OperationErrorsTotal => {
391 self.operation_errors_total.add(1, &attributes)
392 }
393 observe::MetricValue::OperationExecuting(v) => {
394 self.operation_executing.add(v as i64, &attributes)
395 }
396 observe::MetricValue::OperationTtfbSeconds(v) => self
397 .operation_ttfb_seconds
398 .record(v.as_secs_f64(), &attributes),
399
400 observe::MetricValue::HttpExecuting(v) => {
401 self.http_executing.add(v as i64, &attributes)
402 }
403 observe::MetricValue::HttpRequestBytes(v) => {
404 self.http_request_bytes.record(v, &attributes)
405 }
406 observe::MetricValue::HttpRequestBytesRate(v) => {
407 self.http_request_bytes_rate.record(v, &attributes)
408 }
409 observe::MetricValue::HttpRequestDurationSeconds(v) => self
410 .http_request_duration_seconds
411 .record(v.as_secs_f64(), &attributes),
412 observe::MetricValue::HttpResponseBytes(v) => {
413 self.http_response_bytes.record(v, &attributes)
414 }
415 observe::MetricValue::HttpResponseBytesRate(v) => {
416 self.http_response_bytes_rate.record(v, &attributes)
417 }
418 observe::MetricValue::HttpResponseDurationSeconds(v) => self
419 .http_response_duration_seconds
420 .record(v.as_secs_f64(), &attributes),
421 observe::MetricValue::HttpConnectionErrorsTotal => {
422 self.http_connection_errors_total.add(1, &attributes)
423 }
424 observe::MetricValue::HttpStatusErrorsTotal => {
425 self.http_status_errors_total.add(1, &attributes)
426 }
427 }
428 }
429}
430
431impl OtelMetricsInterceptor {
432 fn create_attributes(&self, attrs: observe::MetricLabels) -> Vec<KeyValue> {
433 let mut attributes = Vec::with_capacity(6);
434
435 attributes.extend([
436 KeyValue::new(observe::LABEL_SCHEME, attrs.scheme.into_static()),
437 KeyValue::new(observe::LABEL_NAMESPACE, attrs.namespace),
438 KeyValue::new(observe::LABEL_ROOT, attrs.root),
439 KeyValue::new(observe::LABEL_OPERATION, attrs.operation),
440 ]);
441
442 if let Some(error) = attrs.error {
443 attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static()));
444 }
445
446 if let Some(status_code) = attrs.status_code {
447 attributes.push(KeyValue::new(
448 observe::LABEL_STATUS_CODE,
449 status_code.as_u16() as i64,
450 ));
451 }
452
453 attributes
454 }
455}
456
457fn register_u64_histogram_meter(
458 meter: &Meter,
459 name: &'static str,
460 metric: observe::MetricValue,
461 boundaries: Vec<f64>,
462) -> Histogram<u64> {
463 let (_name, unit) = metric.name_with_unit();
464 let description = metric.help();
465
466 let builder = meter
467 .u64_histogram(name)
468 .with_description(description)
469 .with_boundaries(boundaries);
470
471 if let Some(unit) = unit {
472 builder.with_unit(unit).build()
473 } else {
474 builder.build()
475 }
476}
477
478fn register_f64_histogram_meter(
479 meter: &Meter,
480 name: &'static str,
481 metric: observe::MetricValue,
482 boundaries: Vec<f64>,
483) -> Histogram<f64> {
484 let (_name, unit) = metric.name_with_unit();
485 let description = metric.help();
486
487 let builder = meter
488 .f64_histogram(name)
489 .with_description(description)
490 .with_boundaries(boundaries);
491
492 if let Some(unit) = unit {
493 builder.with_unit(unit).build()
494 } else {
495 builder.build()
496 }
497}