1use std::time::Duration;
19
20use opentelemetry::metrics::Counter;
21use opentelemetry::metrics::Histogram;
22use opentelemetry::metrics::Meter;
23use opentelemetry::metrics::UpDownCounter;
24use opentelemetry::KeyValue;
25
26use crate::layers::observe;
27use crate::raw::*;
28
29#[derive(Clone, Debug)]
48pub struct OtelMetricsLayer {
49 interceptor: OtelMetricsInterceptor,
50}
51
52impl OtelMetricsLayer {
53 pub fn builder() -> OtelMetricsLayerBuilder {
74 OtelMetricsLayerBuilder::default()
75 }
76}
77
78pub struct OtelMetricsLayerBuilder {
80 bytes_boundaries: Vec<f64>,
81 bytes_rate_boundaries: Vec<f64>,
82 entries_boundaries: Vec<f64>,
83 entries_rate_boundaries: Vec<f64>,
84 duration_seconds_boundaries: Vec<f64>,
85 ttfb_boundaries: Vec<f64>,
86}
87
88impl Default for OtelMetricsLayerBuilder {
89 fn default() -> Self {
90 Self {
91 bytes_boundaries: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
92 bytes_rate_boundaries: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
93 entries_boundaries: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
94 entries_rate_boundaries: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
95 duration_seconds_boundaries: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
96 ttfb_boundaries: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
97 }
98 }
99}
100
101impl OtelMetricsLayerBuilder {
102 pub fn bytes_boundaries(mut self, boundaries: Vec<f64>) -> Self {
104 if !boundaries.is_empty() {
105 self.bytes_boundaries = boundaries;
106 }
107 self
108 }
109
110 pub fn bytes_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
112 if !boundaries.is_empty() {
113 self.bytes_rate_boundaries = boundaries;
114 }
115 self
116 }
117
118 pub fn entries_boundaries(mut self, boundaries: Vec<f64>) -> Self {
120 if !boundaries.is_empty() {
121 self.entries_boundaries = boundaries;
122 }
123 self
124 }
125
126 pub fn entries_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
128 if !boundaries.is_empty() {
129 self.entries_rate_boundaries = boundaries;
130 }
131 self
132 }
133
134 pub fn duration_seconds_boundaries(mut self, boundaries: Vec<f64>) -> Self {
136 if !boundaries.is_empty() {
137 self.duration_seconds_boundaries = boundaries;
138 }
139 self
140 }
141
142 pub fn ttfb_boundaries(mut self, boundaries: Vec<f64>) -> Self {
144 if !boundaries.is_empty() {
145 self.ttfb_boundaries = boundaries;
146 }
147 self
148 }
149
150 pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
171 let operation_bytes = {
172 let metric = observe::MetricValue::OperationBytes(0);
173 register_u64_histogram_meter(
174 meter,
175 "opendal.operation.bytes",
176 metric,
177 self.bytes_boundaries.clone(),
178 )
179 };
180 let operation_bytes_rate = {
181 let metric = observe::MetricValue::OperationBytesRate(0.0);
182 register_f64_histogram_meter(
183 meter,
184 "opendal.operation.bytes_rate",
185 metric,
186 self.bytes_rate_boundaries.clone(),
187 )
188 };
189 let operation_entries = {
190 let metric = observe::MetricValue::OperationEntries(0);
191 register_u64_histogram_meter(
192 meter,
193 "opendal.operation.entries",
194 metric,
195 self.entries_boundaries.clone(),
196 )
197 };
198 let operation_entries_rate = {
199 let metric = observe::MetricValue::OperationEntriesRate(0.0);
200 register_f64_histogram_meter(
201 meter,
202 "opendal.operation.entries_rate",
203 metric,
204 self.entries_rate_boundaries.clone(),
205 )
206 };
207 let operation_duration_seconds = {
208 let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
209 register_f64_histogram_meter(
210 meter,
211 "opendal.operation.duration",
212 metric,
213 self.duration_seconds_boundaries.clone(),
214 )
215 };
216 let operation_errors_total = {
217 let metric = observe::MetricValue::OperationErrorsTotal;
218 meter
219 .u64_counter("opendal.operation.errors")
220 .with_description(metric.help())
221 .build()
222 };
223 let operation_executing = {
224 let metric = observe::MetricValue::OperationExecuting(0);
225 meter
226 .i64_up_down_counter("opendal.operation.executing")
227 .with_description(metric.help())
228 .build()
229 };
230 let operation_ttfb_seconds = {
231 let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
232 register_f64_histogram_meter(
233 meter,
234 "opendal.operation.ttfb",
235 metric,
236 self.duration_seconds_boundaries.clone(),
237 )
238 };
239
240 let http_executing = {
241 let metric = observe::MetricValue::HttpExecuting(0);
242 meter
243 .i64_up_down_counter("opendal.http.executing")
244 .with_description(metric.help())
245 .build()
246 };
247 let http_request_bytes = {
248 let metric = observe::MetricValue::HttpRequestBytes(0);
249 register_u64_histogram_meter(
250 meter,
251 "opendal.http.request.bytes",
252 metric,
253 self.bytes_boundaries.clone(),
254 )
255 };
256 let http_request_bytes_rate = {
257 let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
258 register_f64_histogram_meter(
259 meter,
260 "opendal.http.request.bytes_rate",
261 metric,
262 self.bytes_rate_boundaries.clone(),
263 )
264 };
265 let http_request_duration_seconds = {
266 let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
267 register_f64_histogram_meter(
268 meter,
269 "opendal.http.request.duration",
270 metric,
271 self.duration_seconds_boundaries.clone(),
272 )
273 };
274 let http_response_bytes = {
275 let metric = observe::MetricValue::HttpResponseBytes(0);
276 register_u64_histogram_meter(
277 meter,
278 "opendal.http.response.bytes",
279 metric,
280 self.bytes_boundaries.clone(),
281 )
282 };
283 let http_response_bytes_rate = {
284 let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
285 register_f64_histogram_meter(
286 meter,
287 "opendal.http.response.bytes_rate",
288 metric,
289 self.bytes_rate_boundaries.clone(),
290 )
291 };
292 let http_response_duration_seconds = {
293 let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
294 register_f64_histogram_meter(
295 meter,
296 "opendal.http.response.duration",
297 metric,
298 self.duration_seconds_boundaries.clone(),
299 )
300 };
301 let http_connection_errors_total = {
302 let metric = observe::MetricValue::HttpConnectionErrorsTotal;
303 meter
304 .u64_counter("opendal.http.connection_errors")
305 .with_description(metric.help())
306 .build()
307 };
308 let http_status_errors_total = {
309 let metric = observe::MetricValue::HttpStatusErrorsTotal;
310 meter
311 .u64_counter("opendal.http.status_errors")
312 .with_description(metric.help())
313 .build()
314 };
315
316 OtelMetricsLayer {
317 interceptor: OtelMetricsInterceptor {
318 operation_bytes,
319 operation_bytes_rate,
320 operation_entries,
321 operation_entries_rate,
322 operation_duration_seconds,
323 operation_errors_total,
324 operation_executing,
325 operation_ttfb_seconds,
326
327 http_executing,
328 http_request_bytes,
329 http_request_bytes_rate,
330 http_request_duration_seconds,
331 http_response_bytes,
332 http_response_bytes_rate,
333 http_response_duration_seconds,
334 http_connection_errors_total,
335 http_status_errors_total,
336 },
337 }
338 }
339}
340
341impl<A: Access> Layer<A> for OtelMetricsLayer {
342 type LayeredAccess = observe::MetricsAccessor<A, OtelMetricsInterceptor>;
343
344 fn layer(&self, inner: A) -> Self::LayeredAccess {
345 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
346 }
347}
348
349#[derive(Clone, Debug)]
350pub struct OtelMetricsInterceptor {
351 operation_bytes: Histogram<u64>,
352 operation_bytes_rate: Histogram<f64>,
353 operation_entries: Histogram<u64>,
354 operation_entries_rate: Histogram<f64>,
355 operation_duration_seconds: Histogram<f64>,
356 operation_errors_total: Counter<u64>,
357 operation_executing: UpDownCounter<i64>,
358 operation_ttfb_seconds: Histogram<f64>,
359
360 http_executing: UpDownCounter<i64>,
361 http_request_bytes: Histogram<u64>,
362 http_request_bytes_rate: Histogram<f64>,
363 http_request_duration_seconds: Histogram<f64>,
364 http_response_bytes: Histogram<u64>,
365 http_response_bytes_rate: Histogram<f64>,
366 http_response_duration_seconds: Histogram<f64>,
367 http_connection_errors_total: Counter<u64>,
368 http_status_errors_total: Counter<u64>,
369}
370
371impl observe::MetricsIntercept for OtelMetricsInterceptor {
372 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
373 let attributes = self.create_attributes(labels);
374
375 match value {
376 observe::MetricValue::OperationBytes(v) => self.operation_bytes.record(v, &attributes),
377 observe::MetricValue::OperationBytesRate(v) => {
378 self.operation_bytes_rate.record(v, &attributes)
379 }
380 observe::MetricValue::OperationEntries(v) => {
381 self.operation_entries.record(v, &attributes)
382 }
383 observe::MetricValue::OperationEntriesRate(v) => {
384 self.operation_entries_rate.record(v, &attributes)
385 }
386 observe::MetricValue::OperationDurationSeconds(v) => self
387 .operation_duration_seconds
388 .record(v.as_secs_f64(), &attributes),
389 observe::MetricValue::OperationErrorsTotal => {
390 self.operation_errors_total.add(1, &attributes)
391 }
392 observe::MetricValue::OperationExecuting(v) => {
393 self.operation_executing.add(v as i64, &attributes)
394 }
395 observe::MetricValue::OperationTtfbSeconds(v) => self
396 .operation_ttfb_seconds
397 .record(v.as_secs_f64(), &attributes),
398
399 observe::MetricValue::HttpExecuting(v) => {
400 self.http_executing.add(v as i64, &attributes)
401 }
402 observe::MetricValue::HttpRequestBytes(v) => {
403 self.http_request_bytes.record(v, &attributes)
404 }
405 observe::MetricValue::HttpRequestBytesRate(v) => {
406 self.http_request_bytes_rate.record(v, &attributes)
407 }
408 observe::MetricValue::HttpRequestDurationSeconds(v) => self
409 .http_request_duration_seconds
410 .record(v.as_secs_f64(), &attributes),
411 observe::MetricValue::HttpResponseBytes(v) => {
412 self.http_response_bytes.record(v, &attributes)
413 }
414 observe::MetricValue::HttpResponseBytesRate(v) => {
415 self.http_response_bytes_rate.record(v, &attributes)
416 }
417 observe::MetricValue::HttpResponseDurationSeconds(v) => self
418 .http_response_duration_seconds
419 .record(v.as_secs_f64(), &attributes),
420 observe::MetricValue::HttpConnectionErrorsTotal => {
421 self.http_connection_errors_total.add(1, &attributes)
422 }
423 observe::MetricValue::HttpStatusErrorsTotal => {
424 self.http_status_errors_total.add(1, &attributes)
425 }
426 }
427 }
428}
429
430impl OtelMetricsInterceptor {
431 fn create_attributes(&self, attrs: observe::MetricLabels) -> Vec<KeyValue> {
432 let mut attributes = Vec::with_capacity(6);
433
434 attributes.extend([
435 KeyValue::new(observe::LABEL_SCHEME, attrs.scheme.into_static()),
436 KeyValue::new(observe::LABEL_NAMESPACE, attrs.namespace),
437 KeyValue::new(observe::LABEL_ROOT, attrs.root),
438 KeyValue::new(observe::LABEL_OPERATION, attrs.operation),
439 ]);
440
441 if let Some(error) = attrs.error {
442 attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static()));
443 }
444
445 if let Some(status_code) = attrs.status_code {
446 attributes.push(KeyValue::new(
447 observe::LABEL_STATUS_CODE,
448 status_code.as_u16() as i64,
449 ));
450 }
451
452 attributes
453 }
454}
455
456fn register_u64_histogram_meter(
457 meter: &Meter,
458 name: &'static str,
459 metric: observe::MetricValue,
460 boundaries: Vec<f64>,
461) -> Histogram<u64> {
462 let (_name, unit) = metric.name_with_unit();
463 let description = metric.help();
464
465 let builder = meter
466 .u64_histogram(name)
467 .with_description(description)
468 .with_boundaries(boundaries);
469
470 if let Some(unit) = unit {
471 builder.with_unit(unit).build()
472 } else {
473 builder.build()
474 }
475}
476
477fn register_f64_histogram_meter(
478 meter: &Meter,
479 name: &'static str,
480 metric: observe::MetricValue,
481 boundaries: Vec<f64>,
482) -> Histogram<f64> {
483 let (_name, unit) = metric.name_with_unit();
484 let description = metric.help();
485
486 let builder = meter
487 .f64_histogram(name)
488 .with_description(description)
489 .with_boundaries(boundaries);
490
491 if let Some(unit) = unit {
492 builder.with_unit(unit).build()
493 } else {
494 builder.build()
495 }
496}