1use prometheus::HistogramVec;
19use prometheus::Registry;
20use prometheus::core::AtomicI64;
21use prometheus::core::AtomicU64;
22use prometheus::core::GenericCounterVec;
23use prometheus::core::GenericGaugeVec;
24use prometheus::register_histogram_vec_with_registry;
25use prometheus::register_int_counter_vec_with_registry;
26use prometheus::register_int_gauge_vec_with_registry;
27
28use crate::layers::observe;
29use crate::raw::Access;
30use crate::raw::*;
31use crate::*;
32
33#[derive(Clone, Debug)]
139pub struct PrometheusLayer {
140 interceptor: PrometheusInterceptor,
141}
142
143impl PrometheusLayer {
144 pub fn builder() -> PrometheusLayerBuilder {
175 PrometheusLayerBuilder::default()
176 }
177}
178
179impl<A: Access> Layer<A> for PrometheusLayer {
180 type LayeredAccess = observe::MetricsAccessor<A, PrometheusInterceptor>;
181
182 fn layer(&self, inner: A) -> Self::LayeredAccess {
183 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
184 }
185}
186
187pub struct PrometheusLayerBuilder {
189 bytes_buckets: Vec<f64>,
190 bytes_rate_buckets: Vec<f64>,
191 entries_buckets: Vec<f64>,
192 entries_rate_buckets: Vec<f64>,
193 duration_seconds_buckets: Vec<f64>,
194 ttfb_buckets: Vec<f64>,
195}
196
197impl Default for PrometheusLayerBuilder {
198 fn default() -> Self {
199 Self {
200 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
201 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
202 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
203 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
204 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
205 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
206 }
207 }
208}
209
210impl PrometheusLayerBuilder {
211 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
213 if !buckets.is_empty() {
214 self.bytes_buckets = buckets;
215 }
216 self
217 }
218
219 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
221 if !buckets.is_empty() {
222 self.bytes_rate_buckets = buckets;
223 }
224 self
225 }
226
227 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
229 if !buckets.is_empty() {
230 self.entries_buckets = buckets;
231 }
232 self
233 }
234
235 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
237 if !buckets.is_empty() {
238 self.entries_rate_buckets = buckets;
239 }
240 self
241 }
242
243 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
245 if !buckets.is_empty() {
246 self.duration_seconds_buckets = buckets;
247 }
248 self
249 }
250
251 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
253 if !buckets.is_empty() {
254 self.ttfb_buckets = buckets;
255 }
256 self
257 }
258
259 pub fn register(self, registry: &Registry) -> Result<PrometheusLayer> {
284 let labels = OperationLabels::names();
285 let operation_bytes = {
286 let metric = observe::MetricValue::OperationBytes(0);
287 register_histogram_vec_with_registry!(
288 metric.name(),
289 metric.help(),
290 labels.as_ref(),
291 self.bytes_buckets.clone(),
292 registry
293 )
294 .map_err(parse_prometheus_error)?
295 };
296 let operation_bytes_rate = {
297 let metric = observe::MetricValue::OperationBytesRate(0.0);
298 register_histogram_vec_with_registry!(
299 metric.name(),
300 metric.help(),
301 labels.as_ref(),
302 self.bytes_rate_buckets.clone(),
303 registry
304 )
305 .map_err(parse_prometheus_error)?
306 };
307 let operation_entries = {
308 let metric = observe::MetricValue::OperationEntries(0);
309 register_histogram_vec_with_registry!(
310 metric.name(),
311 metric.help(),
312 labels.as_ref(),
313 self.entries_buckets,
314 registry
315 )
316 .map_err(parse_prometheus_error)?
317 };
318 let operation_entries_rate = {
319 let metric = observe::MetricValue::OperationEntriesRate(0.0);
320 register_histogram_vec_with_registry!(
321 metric.name(),
322 metric.help(),
323 labels.as_ref(),
324 self.entries_rate_buckets,
325 registry
326 )
327 .map_err(parse_prometheus_error)?
328 };
329 let operation_duration_seconds = {
330 let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
331 register_histogram_vec_with_registry!(
332 metric.name(),
333 metric.help(),
334 labels.as_ref(),
335 self.duration_seconds_buckets.clone(),
336 registry
337 )
338 .map_err(parse_prometheus_error)?
339 };
340 let operation_executing = {
341 let metric = observe::MetricValue::OperationExecuting(0);
342 register_int_gauge_vec_with_registry!(
343 metric.name(),
344 metric.help(),
345 labels.as_ref(),
346 registry
347 )
348 .map_err(parse_prometheus_error)?
349 };
350 let operation_ttfb_seconds = {
351 let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
352 register_histogram_vec_with_registry!(
353 metric.name(),
354 metric.help(),
355 labels.as_ref(),
356 self.ttfb_buckets.clone(),
357 registry
358 )
359 .map_err(parse_prometheus_error)?
360 };
361
362 let labels_with_error = OperationLabels::names().with_error();
363 let operation_errors_total = {
364 let metric = observe::MetricValue::OperationErrorsTotal;
365 register_int_counter_vec_with_registry!(
366 metric.name(),
367 metric.help(),
368 labels_with_error.as_ref(),
369 registry
370 )
371 .map_err(parse_prometheus_error)?
372 };
373
374 let http_executing = {
375 let metric = observe::MetricValue::HttpExecuting(0);
376 register_int_gauge_vec_with_registry!(
377 metric.name(),
378 metric.help(),
379 labels.as_ref(),
380 registry
381 )
382 .map_err(parse_prometheus_error)?
383 };
384 let http_request_bytes = {
385 let metric = observe::MetricValue::HttpRequestBytes(0);
386 register_histogram_vec_with_registry!(
387 metric.name(),
388 metric.help(),
389 labels.as_ref(),
390 self.bytes_buckets.clone(),
391 registry
392 )
393 .map_err(parse_prometheus_error)?
394 };
395 let http_request_bytes_rate = {
396 let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
397 register_histogram_vec_with_registry!(
398 metric.name(),
399 metric.help(),
400 labels.as_ref(),
401 self.bytes_rate_buckets.clone(),
402 registry
403 )
404 .map_err(parse_prometheus_error)?
405 };
406 let http_request_duration_seconds = {
407 let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
408 register_histogram_vec_with_registry!(
409 metric.name(),
410 metric.help(),
411 labels.as_ref(),
412 self.duration_seconds_buckets.clone(),
413 registry
414 )
415 .map_err(parse_prometheus_error)?
416 };
417 let http_response_bytes = {
418 let metric = observe::MetricValue::HttpResponseBytes(0);
419 register_histogram_vec_with_registry!(
420 metric.name(),
421 metric.help(),
422 labels.as_ref(),
423 self.bytes_buckets,
424 registry
425 )
426 .map_err(parse_prometheus_error)?
427 };
428 let http_response_bytes_rate = {
429 let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
430 register_histogram_vec_with_registry!(
431 metric.name(),
432 metric.help(),
433 labels.as_ref(),
434 self.bytes_rate_buckets,
435 registry
436 )
437 .map_err(parse_prometheus_error)?
438 };
439 let http_response_duration_seconds = {
440 let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
441 register_histogram_vec_with_registry!(
442 metric.name(),
443 metric.help(),
444 labels.as_ref(),
445 self.duration_seconds_buckets,
446 registry
447 )
448 .map_err(parse_prometheus_error)?
449 };
450 let http_connection_errors_total = {
451 let metric = observe::MetricValue::HttpConnectionErrorsTotal;
452 register_int_counter_vec_with_registry!(
453 metric.name(),
454 metric.help(),
455 labels.as_ref(),
456 registry
457 )
458 .map_err(parse_prometheus_error)?
459 };
460
461 let labels_with_status_code = OperationLabels::names().with_status_code();
462 let http_status_errors_total = {
463 let metric = observe::MetricValue::HttpStatusErrorsTotal;
464 register_int_counter_vec_with_registry!(
465 metric.name(),
466 metric.help(),
467 labels_with_status_code.as_ref(),
468 registry
469 )
470 .map_err(parse_prometheus_error)?
471 };
472
473 Ok(PrometheusLayer {
474 interceptor: PrometheusInterceptor {
475 operation_bytes,
476 operation_bytes_rate,
477 operation_entries,
478 operation_entries_rate,
479 operation_duration_seconds,
480 operation_errors_total,
481 operation_executing,
482 operation_ttfb_seconds,
483
484 http_executing,
485 http_request_bytes,
486 http_request_bytes_rate,
487 http_request_duration_seconds,
488 http_response_bytes,
489 http_response_bytes_rate,
490 http_response_duration_seconds,
491 http_connection_errors_total,
492 http_status_errors_total,
493 },
494 })
495 }
496
497 pub fn register_default(self) -> Result<PrometheusLayer> {
522 let registry = prometheus::default_registry();
523 self.register(registry)
524 }
525}
526
527fn parse_prometheus_error(err: prometheus::Error) -> Error {
529 Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err)
530}
531
532#[derive(Clone, Debug)]
533pub struct PrometheusInterceptor {
534 operation_bytes: HistogramVec,
535 operation_bytes_rate: HistogramVec,
536 operation_entries: HistogramVec,
537 operation_entries_rate: HistogramVec,
538 operation_duration_seconds: HistogramVec,
539 operation_errors_total: GenericCounterVec<AtomicU64>,
540 operation_executing: GenericGaugeVec<AtomicI64>,
541 operation_ttfb_seconds: HistogramVec,
542
543 http_executing: GenericGaugeVec<AtomicI64>,
544 http_request_bytes: HistogramVec,
545 http_request_bytes_rate: HistogramVec,
546 http_request_duration_seconds: HistogramVec,
547 http_response_bytes: HistogramVec,
548 http_response_bytes_rate: HistogramVec,
549 http_response_duration_seconds: HistogramVec,
550 http_connection_errors_total: GenericCounterVec<AtomicU64>,
551 http_status_errors_total: GenericCounterVec<AtomicU64>,
552}
553
554impl observe::MetricsIntercept for PrometheusInterceptor {
555 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
556 let labels = OperationLabels(labels);
557 match value {
558 observe::MetricValue::OperationBytes(v) => self
559 .operation_bytes
560 .with_label_values(&labels.values())
561 .observe(v as f64),
562 observe::MetricValue::OperationBytesRate(v) => self
563 .operation_bytes_rate
564 .with_label_values(&labels.values())
565 .observe(v),
566 observe::MetricValue::OperationEntries(v) => self
567 .operation_entries
568 .with_label_values(&labels.values())
569 .observe(v as f64),
570 observe::MetricValue::OperationEntriesRate(v) => self
571 .operation_entries_rate
572 .with_label_values(&labels.values())
573 .observe(v),
574 observe::MetricValue::OperationDurationSeconds(v) => self
575 .operation_duration_seconds
576 .with_label_values(&labels.values())
577 .observe(v.as_secs_f64()),
578 observe::MetricValue::OperationErrorsTotal => self
579 .operation_errors_total
580 .with_label_values(&labels.values())
581 .inc(),
582 observe::MetricValue::OperationExecuting(v) => self
583 .operation_executing
584 .with_label_values(&labels.values())
585 .add(v as i64),
586 observe::MetricValue::OperationTtfbSeconds(v) => self
587 .operation_ttfb_seconds
588 .with_label_values(&labels.values())
589 .observe(v.as_secs_f64()),
590
591 observe::MetricValue::HttpExecuting(v) => self
592 .http_executing
593 .with_label_values(&labels.values())
594 .add(v as i64),
595 observe::MetricValue::HttpRequestBytes(v) => self
596 .http_request_bytes
597 .with_label_values(&labels.values())
598 .observe(v as f64),
599 observe::MetricValue::HttpRequestBytesRate(v) => self
600 .http_request_bytes_rate
601 .with_label_values(&labels.values())
602 .observe(v),
603 observe::MetricValue::HttpRequestDurationSeconds(v) => self
604 .http_request_duration_seconds
605 .with_label_values(&labels.values())
606 .observe(v.as_secs_f64()),
607 observe::MetricValue::HttpResponseBytes(v) => self
608 .http_response_bytes
609 .with_label_values(&labels.values())
610 .observe(v as f64),
611 observe::MetricValue::HttpResponseBytesRate(v) => self
612 .http_response_bytes_rate
613 .with_label_values(&labels.values())
614 .observe(v),
615 observe::MetricValue::HttpResponseDurationSeconds(v) => self
616 .http_response_duration_seconds
617 .with_label_values(&labels.values())
618 .observe(v.as_secs_f64()),
619 observe::MetricValue::HttpConnectionErrorsTotal => self
620 .http_connection_errors_total
621 .with_label_values(&labels.values())
622 .inc(),
623 observe::MetricValue::HttpStatusErrorsTotal => self
624 .http_status_errors_total
625 .with_label_values(&labels.values())
626 .inc(),
627 }
628 }
629}
630
631struct OperationLabelNames(Vec<&'static str>);
632
633impl AsRef<[&'static str]> for OperationLabelNames {
634 fn as_ref(&self) -> &[&'static str] {
635 &self.0
636 }
637}
638
639impl OperationLabelNames {
640 fn with_error(mut self) -> Self {
641 self.0.push(observe::LABEL_ERROR);
642 self
643 }
644
645 fn with_status_code(mut self) -> Self {
646 self.0.push(observe::LABEL_STATUS_CODE);
647 self
648 }
649}
650
651#[derive(Clone, Debug, PartialEq, Eq, Hash)]
652struct OperationLabels(observe::MetricLabels);
653
654impl OperationLabels {
655 fn names() -> OperationLabelNames {
656 OperationLabelNames(vec![
657 observe::LABEL_SCHEME,
658 observe::LABEL_NAMESPACE,
659 observe::LABEL_ROOT,
660 observe::LABEL_OPERATION,
661 ])
662 }
663
664 fn values(&self) -> Vec<&str> {
665 let mut labels = Vec::with_capacity(6);
666
667 labels.extend([
668 self.0.scheme,
669 self.0.namespace.as_ref(),
670 self.0.root.as_ref(),
671 self.0.operation,
672 ]);
673
674 if let Some(error) = self.0.error {
675 labels.push(error.into_static());
676 }
677
678 if let Some(status_code) = &self.0.status_code {
679 labels.push(status_code.as_str());
680 }
681
682 labels
683 }
684}