1use opentelemetry::KeyValue;
19use opentelemetry::metrics::Counter;
20use opentelemetry::metrics::Histogram;
21use opentelemetry::metrics::Meter;
22use opentelemetry::metrics::UpDownCounter;
23
24use crate::layers::observe;
25use crate::raw::*;
26
27#[derive(Clone, Debug)]
46pub struct OtelMetricsLayer {
47 interceptor: OtelMetricsInterceptor,
48}
49
50impl OtelMetricsLayer {
51 pub fn builder() -> OtelMetricsLayerBuilder {
72 OtelMetricsLayerBuilder::default()
73 }
74}
75
76pub struct OtelMetricsLayerBuilder {
78 bytes_boundaries: Vec<f64>,
79 bytes_rate_boundaries: Vec<f64>,
80 entries_boundaries: Vec<f64>,
81 entries_rate_boundaries: Vec<f64>,
82 duration_seconds_boundaries: Vec<f64>,
83 ttfb_boundaries: Vec<f64>,
84}
85
86impl Default for OtelMetricsLayerBuilder {
87 fn default() -> Self {
88 Self {
89 bytes_boundaries: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
90 bytes_rate_boundaries: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
91 entries_boundaries: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
92 entries_rate_boundaries: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
93 duration_seconds_boundaries: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
94 ttfb_boundaries: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
95 }
96 }
97}
98
99impl OtelMetricsLayerBuilder {
100 pub fn bytes_boundaries(mut self, boundaries: Vec<f64>) -> Self {
102 if !boundaries.is_empty() {
103 self.bytes_boundaries = boundaries;
104 }
105 self
106 }
107
108 pub fn bytes_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
110 if !boundaries.is_empty() {
111 self.bytes_rate_boundaries = boundaries;
112 }
113 self
114 }
115
116 pub fn entries_boundaries(mut self, boundaries: Vec<f64>) -> Self {
118 if !boundaries.is_empty() {
119 self.entries_boundaries = boundaries;
120 }
121 self
122 }
123
124 pub fn entries_rate_boundaries(mut self, boundaries: Vec<f64>) -> Self {
126 if !boundaries.is_empty() {
127 self.entries_rate_boundaries = boundaries;
128 }
129 self
130 }
131
132 pub fn duration_seconds_boundaries(mut self, boundaries: Vec<f64>) -> Self {
134 if !boundaries.is_empty() {
135 self.duration_seconds_boundaries = boundaries;
136 }
137 self
138 }
139
140 pub fn ttfb_boundaries(mut self, boundaries: Vec<f64>) -> Self {
142 if !boundaries.is_empty() {
143 self.ttfb_boundaries = boundaries;
144 }
145 self
146 }
147
148 pub fn register(self, meter: &Meter) -> OtelMetricsLayer {
169 let operation_bytes = {
170 let metric = observe::MetricValue::OperationBytes(0);
171 register_u64_histogram_meter(
172 meter,
173 "opendal.operation.bytes",
174 metric,
175 self.bytes_boundaries.clone(),
176 )
177 };
178 let operation_bytes_rate = {
179 let metric = observe::MetricValue::OperationBytesRate(0.0);
180 register_f64_histogram_meter(
181 meter,
182 "opendal.operation.bytes_rate",
183 metric,
184 self.bytes_rate_boundaries.clone(),
185 )
186 };
187 let operation_entries = {
188 let metric = observe::MetricValue::OperationEntries(0);
189 register_u64_histogram_meter(
190 meter,
191 "opendal.operation.entries",
192 metric,
193 self.entries_boundaries.clone(),
194 )
195 };
196 let operation_entries_rate = {
197 let metric = observe::MetricValue::OperationEntriesRate(0.0);
198 register_f64_histogram_meter(
199 meter,
200 "opendal.operation.entries_rate",
201 metric,
202 self.entries_rate_boundaries.clone(),
203 )
204 };
205 let operation_duration_seconds = {
206 let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
207 register_f64_histogram_meter(
208 meter,
209 "opendal.operation.duration",
210 metric,
211 self.duration_seconds_boundaries.clone(),
212 )
213 };
214 let operation_errors_total = {
215 let metric = observe::MetricValue::OperationErrorsTotal;
216 meter
217 .u64_counter("opendal.operation.errors")
218 .with_description(metric.help())
219 .build()
220 };
221 let operation_executing = {
222 let metric = observe::MetricValue::OperationExecuting(0);
223 meter
224 .i64_up_down_counter("opendal.operation.executing")
225 .with_description(metric.help())
226 .build()
227 };
228 let operation_ttfb_seconds = {
229 let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
230 register_f64_histogram_meter(
231 meter,
232 "opendal.operation.ttfb",
233 metric,
234 self.duration_seconds_boundaries.clone(),
235 )
236 };
237
238 let http_executing = {
239 let metric = observe::MetricValue::HttpExecuting(0);
240 meter
241 .i64_up_down_counter("opendal.http.executing")
242 .with_description(metric.help())
243 .build()
244 };
245 let http_request_bytes = {
246 let metric = observe::MetricValue::HttpRequestBytes(0);
247 register_u64_histogram_meter(
248 meter,
249 "opendal.http.request.bytes",
250 metric,
251 self.bytes_boundaries.clone(),
252 )
253 };
254 let http_request_bytes_rate = {
255 let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
256 register_f64_histogram_meter(
257 meter,
258 "opendal.http.request.bytes_rate",
259 metric,
260 self.bytes_rate_boundaries.clone(),
261 )
262 };
263 let http_request_duration_seconds = {
264 let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
265 register_f64_histogram_meter(
266 meter,
267 "opendal.http.request.duration",
268 metric,
269 self.duration_seconds_boundaries.clone(),
270 )
271 };
272 let http_response_bytes = {
273 let metric = observe::MetricValue::HttpResponseBytes(0);
274 register_u64_histogram_meter(
275 meter,
276 "opendal.http.response.bytes",
277 metric,
278 self.bytes_boundaries.clone(),
279 )
280 };
281 let http_response_bytes_rate = {
282 let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
283 register_f64_histogram_meter(
284 meter,
285 "opendal.http.response.bytes_rate",
286 metric,
287 self.bytes_rate_boundaries.clone(),
288 )
289 };
290 let http_response_duration_seconds = {
291 let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
292 register_f64_histogram_meter(
293 meter,
294 "opendal.http.response.duration",
295 metric,
296 self.duration_seconds_boundaries.clone(),
297 )
298 };
299 let http_connection_errors_total = {
300 let metric = observe::MetricValue::HttpConnectionErrorsTotal;
301 meter
302 .u64_counter("opendal.http.connection_errors")
303 .with_description(metric.help())
304 .build()
305 };
306 let http_status_errors_total = {
307 let metric = observe::MetricValue::HttpStatusErrorsTotal;
308 meter
309 .u64_counter("opendal.http.status_errors")
310 .with_description(metric.help())
311 .build()
312 };
313
314 OtelMetricsLayer {
315 interceptor: OtelMetricsInterceptor {
316 operation_bytes,
317 operation_bytes_rate,
318 operation_entries,
319 operation_entries_rate,
320 operation_duration_seconds,
321 operation_errors_total,
322 operation_executing,
323 operation_ttfb_seconds,
324
325 http_executing,
326 http_request_bytes,
327 http_request_bytes_rate,
328 http_request_duration_seconds,
329 http_response_bytes,
330 http_response_bytes_rate,
331 http_response_duration_seconds,
332 http_connection_errors_total,
333 http_status_errors_total,
334 },
335 }
336 }
337}
338
339impl<A: Access> Layer<A> for OtelMetricsLayer {
340 type LayeredAccess = observe::MetricsAccessor<A, OtelMetricsInterceptor>;
341
342 fn layer(&self, inner: A) -> Self::LayeredAccess {
343 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
344 }
345}
346
347#[derive(Clone, Debug)]
348pub struct OtelMetricsInterceptor {
349 operation_bytes: Histogram<u64>,
350 operation_bytes_rate: Histogram<f64>,
351 operation_entries: Histogram<u64>,
352 operation_entries_rate: Histogram<f64>,
353 operation_duration_seconds: Histogram<f64>,
354 operation_errors_total: Counter<u64>,
355 operation_executing: UpDownCounter<i64>,
356 operation_ttfb_seconds: Histogram<f64>,
357
358 http_executing: UpDownCounter<i64>,
359 http_request_bytes: Histogram<u64>,
360 http_request_bytes_rate: Histogram<f64>,
361 http_request_duration_seconds: Histogram<f64>,
362 http_response_bytes: Histogram<u64>,
363 http_response_bytes_rate: Histogram<f64>,
364 http_response_duration_seconds: Histogram<f64>,
365 http_connection_errors_total: Counter<u64>,
366 http_status_errors_total: Counter<u64>,
367}
368
369impl observe::MetricsIntercept for OtelMetricsInterceptor {
370 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
371 let attributes = self.create_attributes(labels);
372
373 match value {
374 observe::MetricValue::OperationBytes(v) => self.operation_bytes.record(v, &attributes),
375 observe::MetricValue::OperationBytesRate(v) => {
376 self.operation_bytes_rate.record(v, &attributes)
377 }
378 observe::MetricValue::OperationEntries(v) => {
379 self.operation_entries.record(v, &attributes)
380 }
381 observe::MetricValue::OperationEntriesRate(v) => {
382 self.operation_entries_rate.record(v, &attributes)
383 }
384 observe::MetricValue::OperationDurationSeconds(v) => self
385 .operation_duration_seconds
386 .record(v.as_secs_f64(), &attributes),
387 observe::MetricValue::OperationErrorsTotal => {
388 self.operation_errors_total.add(1, &attributes)
389 }
390 observe::MetricValue::OperationExecuting(v) => {
391 self.operation_executing.add(v as i64, &attributes)
392 }
393 observe::MetricValue::OperationTtfbSeconds(v) => self
394 .operation_ttfb_seconds
395 .record(v.as_secs_f64(), &attributes),
396
397 observe::MetricValue::HttpExecuting(v) => {
398 self.http_executing.add(v as i64, &attributes)
399 }
400 observe::MetricValue::HttpRequestBytes(v) => {
401 self.http_request_bytes.record(v, &attributes)
402 }
403 observe::MetricValue::HttpRequestBytesRate(v) => {
404 self.http_request_bytes_rate.record(v, &attributes)
405 }
406 observe::MetricValue::HttpRequestDurationSeconds(v) => self
407 .http_request_duration_seconds
408 .record(v.as_secs_f64(), &attributes),
409 observe::MetricValue::HttpResponseBytes(v) => {
410 self.http_response_bytes.record(v, &attributes)
411 }
412 observe::MetricValue::HttpResponseBytesRate(v) => {
413 self.http_response_bytes_rate.record(v, &attributes)
414 }
415 observe::MetricValue::HttpResponseDurationSeconds(v) => self
416 .http_response_duration_seconds
417 .record(v.as_secs_f64(), &attributes),
418 observe::MetricValue::HttpConnectionErrorsTotal => {
419 self.http_connection_errors_total.add(1, &attributes)
420 }
421 observe::MetricValue::HttpStatusErrorsTotal => {
422 self.http_status_errors_total.add(1, &attributes)
423 }
424 }
425 }
426}
427
428impl OtelMetricsInterceptor {
429 fn create_attributes(&self, attrs: observe::MetricLabels) -> Vec<KeyValue> {
430 let mut attributes = Vec::with_capacity(6);
431
432 attributes.extend([
433 KeyValue::new(observe::LABEL_SCHEME, attrs.scheme),
434 KeyValue::new(observe::LABEL_NAMESPACE, attrs.namespace),
435 KeyValue::new(observe::LABEL_ROOT, attrs.root),
436 KeyValue::new(observe::LABEL_OPERATION, attrs.operation),
437 ]);
438
439 if let Some(error) = attrs.error {
440 attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static()));
441 }
442
443 if let Some(status_code) = attrs.status_code {
444 attributes.push(KeyValue::new(
445 observe::LABEL_STATUS_CODE,
446 status_code.as_u16() as i64,
447 ));
448 }
449
450 attributes
451 }
452}
453
454fn register_u64_histogram_meter(
455 meter: &Meter,
456 name: &'static str,
457 metric: observe::MetricValue,
458 boundaries: Vec<f64>,
459) -> Histogram<u64> {
460 let (_name, unit) = metric.name_with_unit();
461 let description = metric.help();
462
463 let builder = meter
464 .u64_histogram(name)
465 .with_description(description)
466 .with_boundaries(boundaries);
467
468 if let Some(unit) = unit {
469 builder.with_unit(unit).build()
470 } else {
471 builder.build()
472 }
473}
474
475fn register_f64_histogram_meter(
476 meter: &Meter,
477 name: &'static str,
478 metric: observe::MetricValue,
479 boundaries: Vec<f64>,
480) -> Histogram<f64> {
481 let (_name, unit) = metric.name_with_unit();
482 let description = metric.help();
483
484 let builder = meter
485 .f64_histogram(name)
486 .with_description(description)
487 .with_boundaries(boundaries);
488
489 if let Some(unit) = unit {
490 builder.with_unit(unit).build()
491 } else {
492 builder.build()
493 }
494}