1use std::time::Duration;
19
20use prometheus::core::AtomicI64;
21use prometheus::core::AtomicU64;
22use prometheus::core::GenericCounterVec;
23use prometheus::core::GenericGaugeVec;
24use prometheus::register_histogram_vec_with_registry;
25use prometheus::register_int_counter_vec_with_registry;
26use prometheus::register_int_gauge_vec_with_registry;
27use prometheus::HistogramVec;
28use prometheus::Registry;
29
30use crate::layers::observe;
31use crate::raw::Access;
32use crate::raw::*;
33use crate::*;
34
35#[derive(Clone, Debug)]
87pub struct PrometheusLayer {
88 interceptor: PrometheusInterceptor,
89}
90
91impl PrometheusLayer {
92 pub fn builder() -> PrometheusLayerBuilder {
126 PrometheusLayerBuilder::default()
127 }
128}
129
130impl<A: Access> Layer<A> for PrometheusLayer {
131 type LayeredAccess = observe::MetricsAccessor<A, PrometheusInterceptor>;
132
133 fn layer(&self, inner: A) -> Self::LayeredAccess {
134 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
135 }
136}
137
138pub struct PrometheusLayerBuilder {
140 bytes_buckets: Vec<f64>,
141 bytes_rate_buckets: Vec<f64>,
142 entries_buckets: Vec<f64>,
143 entries_rate_buckets: Vec<f64>,
144 duration_seconds_buckets: Vec<f64>,
145 ttfb_buckets: Vec<f64>,
146}
147
148impl Default for PrometheusLayerBuilder {
149 fn default() -> Self {
150 Self {
151 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
152 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
153 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
154 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
155 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
156 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
157 }
158 }
159}
160
161impl PrometheusLayerBuilder {
162 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
164 if !buckets.is_empty() {
165 self.bytes_buckets = buckets;
166 }
167 self
168 }
169
170 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
172 if !buckets.is_empty() {
173 self.bytes_rate_buckets = buckets;
174 }
175 self
176 }
177
178 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
180 if !buckets.is_empty() {
181 self.entries_buckets = buckets;
182 }
183 self
184 }
185
186 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
188 if !buckets.is_empty() {
189 self.entries_rate_buckets = buckets;
190 }
191 self
192 }
193
194 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
196 if !buckets.is_empty() {
197 self.duration_seconds_buckets = buckets;
198 }
199 self
200 }
201
202 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
204 if !buckets.is_empty() {
205 self.ttfb_buckets = buckets;
206 }
207 self
208 }
209
210 pub fn register(self, registry: &Registry) -> Result<PrometheusLayer> {
240 let labels = OperationLabels::names();
241 let operation_bytes = {
242 let metric = observe::MetricValue::OperationBytes(0);
243 register_histogram_vec_with_registry!(
244 metric.name(),
245 metric.help(),
246 labels.as_ref(),
247 self.bytes_buckets.clone(),
248 registry
249 )
250 .map_err(parse_prometheus_error)?
251 };
252 let operation_bytes_rate = {
253 let metric = observe::MetricValue::OperationBytesRate(0.0);
254 register_histogram_vec_with_registry!(
255 metric.name(),
256 metric.help(),
257 labels.as_ref(),
258 self.bytes_rate_buckets.clone(),
259 registry
260 )
261 .map_err(parse_prometheus_error)?
262 };
263 let operation_entries = {
264 let metric = observe::MetricValue::OperationEntries(0);
265 register_histogram_vec_with_registry!(
266 metric.name(),
267 metric.help(),
268 labels.as_ref(),
269 self.entries_buckets,
270 registry
271 )
272 .map_err(parse_prometheus_error)?
273 };
274 let operation_entries_rate = {
275 let metric = observe::MetricValue::OperationEntriesRate(0.0);
276 register_histogram_vec_with_registry!(
277 metric.name(),
278 metric.help(),
279 labels.as_ref(),
280 self.entries_rate_buckets,
281 registry
282 )
283 .map_err(parse_prometheus_error)?
284 };
285 let operation_duration_seconds = {
286 let metric = observe::MetricValue::OperationDurationSeconds(Duration::default());
287 register_histogram_vec_with_registry!(
288 metric.name(),
289 metric.help(),
290 labels.as_ref(),
291 self.duration_seconds_buckets.clone(),
292 registry
293 )
294 .map_err(parse_prometheus_error)?
295 };
296 let operation_executing = {
297 let metric = observe::MetricValue::OperationExecuting(0);
298 register_int_gauge_vec_with_registry!(
299 metric.name(),
300 metric.help(),
301 labels.as_ref(),
302 registry
303 )
304 .map_err(parse_prometheus_error)?
305 };
306 let operation_ttfb_seconds = {
307 let metric = observe::MetricValue::OperationTtfbSeconds(Duration::default());
308 register_histogram_vec_with_registry!(
309 metric.name(),
310 metric.help(),
311 labels.as_ref(),
312 self.ttfb_buckets.clone(),
313 registry
314 )
315 .map_err(parse_prometheus_error)?
316 };
317
318 let labels_with_error = OperationLabels::names().with_error();
319 let operation_errors_total = {
320 let metric = observe::MetricValue::OperationErrorsTotal;
321 register_int_counter_vec_with_registry!(
322 metric.name(),
323 metric.help(),
324 labels_with_error.as_ref(),
325 registry
326 )
327 .map_err(parse_prometheus_error)?
328 };
329
330 let http_executing = {
331 let metric = observe::MetricValue::HttpExecuting(0);
332 register_int_gauge_vec_with_registry!(
333 metric.name(),
334 metric.help(),
335 labels.as_ref(),
336 registry
337 )
338 .map_err(parse_prometheus_error)?
339 };
340 let http_request_bytes = {
341 let metric = observe::MetricValue::HttpRequestBytes(0);
342 register_histogram_vec_with_registry!(
343 metric.name(),
344 metric.help(),
345 labels.as_ref(),
346 self.bytes_buckets.clone(),
347 registry
348 )
349 .map_err(parse_prometheus_error)?
350 };
351 let http_request_bytes_rate = {
352 let metric = observe::MetricValue::HttpRequestBytesRate(0.0);
353 register_histogram_vec_with_registry!(
354 metric.name(),
355 metric.help(),
356 labels.as_ref(),
357 self.bytes_rate_buckets.clone(),
358 registry
359 )
360 .map_err(parse_prometheus_error)?
361 };
362 let http_request_duration_seconds = {
363 let metric = observe::MetricValue::HttpRequestDurationSeconds(Duration::default());
364 register_histogram_vec_with_registry!(
365 metric.name(),
366 metric.help(),
367 labels.as_ref(),
368 self.duration_seconds_buckets.clone(),
369 registry
370 )
371 .map_err(parse_prometheus_error)?
372 };
373 let http_response_bytes = {
374 let metric = observe::MetricValue::HttpResponseBytes(0);
375 register_histogram_vec_with_registry!(
376 metric.name(),
377 metric.help(),
378 labels.as_ref(),
379 self.bytes_buckets,
380 registry
381 )
382 .map_err(parse_prometheus_error)?
383 };
384 let http_response_bytes_rate = {
385 let metric = observe::MetricValue::HttpResponseBytesRate(0.0);
386 register_histogram_vec_with_registry!(
387 metric.name(),
388 metric.help(),
389 labels.as_ref(),
390 self.bytes_rate_buckets,
391 registry
392 )
393 .map_err(parse_prometheus_error)?
394 };
395 let http_response_duration_seconds = {
396 let metric = observe::MetricValue::HttpResponseDurationSeconds(Duration::default());
397 register_histogram_vec_with_registry!(
398 metric.name(),
399 metric.help(),
400 labels.as_ref(),
401 self.duration_seconds_buckets,
402 registry
403 )
404 .map_err(parse_prometheus_error)?
405 };
406 let http_connection_errors_total = {
407 let metric = observe::MetricValue::HttpConnectionErrorsTotal;
408 register_int_counter_vec_with_registry!(
409 metric.name(),
410 metric.help(),
411 labels.as_ref(),
412 registry
413 )
414 .map_err(parse_prometheus_error)?
415 };
416
417 let labels_with_status_code = OperationLabels::names().with_status_code();
418 let http_status_errors_total = {
419 let metric = observe::MetricValue::HttpStatusErrorsTotal;
420 register_int_counter_vec_with_registry!(
421 metric.name(),
422 metric.help(),
423 labels_with_status_code.as_ref(),
424 registry
425 )
426 .map_err(parse_prometheus_error)?
427 };
428
429 Ok(PrometheusLayer {
430 interceptor: PrometheusInterceptor {
431 operation_bytes,
432 operation_bytes_rate,
433 operation_entries,
434 operation_entries_rate,
435 operation_duration_seconds,
436 operation_errors_total,
437 operation_executing,
438 operation_ttfb_seconds,
439
440 http_executing,
441 http_request_bytes,
442 http_request_bytes_rate,
443 http_request_duration_seconds,
444 http_response_bytes,
445 http_response_bytes_rate,
446 http_response_duration_seconds,
447 http_connection_errors_total,
448 http_status_errors_total,
449 },
450 })
451 }
452
453 pub fn register_default(self) -> Result<PrometheusLayer> {
482 let registry = prometheus::default_registry();
483 self.register(registry)
484 }
485}
486
487fn parse_prometheus_error(err: prometheus::Error) -> Error {
489 Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err)
490}
491
492#[derive(Clone, Debug)]
493pub struct PrometheusInterceptor {
494 operation_bytes: HistogramVec,
495 operation_bytes_rate: HistogramVec,
496 operation_entries: HistogramVec,
497 operation_entries_rate: HistogramVec,
498 operation_duration_seconds: HistogramVec,
499 operation_errors_total: GenericCounterVec<AtomicU64>,
500 operation_executing: GenericGaugeVec<AtomicI64>,
501 operation_ttfb_seconds: HistogramVec,
502
503 http_executing: GenericGaugeVec<AtomicI64>,
504 http_request_bytes: HistogramVec,
505 http_request_bytes_rate: HistogramVec,
506 http_request_duration_seconds: HistogramVec,
507 http_response_bytes: HistogramVec,
508 http_response_bytes_rate: HistogramVec,
509 http_response_duration_seconds: HistogramVec,
510 http_connection_errors_total: GenericCounterVec<AtomicU64>,
511 http_status_errors_total: GenericCounterVec<AtomicU64>,
512}
513
514impl observe::MetricsIntercept for PrometheusInterceptor {
515 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
516 let labels = OperationLabels(labels);
517 match value {
518 observe::MetricValue::OperationBytes(v) => self
519 .operation_bytes
520 .with_label_values(&labels.values())
521 .observe(v as f64),
522 observe::MetricValue::OperationBytesRate(v) => self
523 .operation_bytes_rate
524 .with_label_values(&labels.values())
525 .observe(v),
526 observe::MetricValue::OperationEntries(v) => self
527 .operation_entries
528 .with_label_values(&labels.values())
529 .observe(v as f64),
530 observe::MetricValue::OperationEntriesRate(v) => self
531 .operation_entries_rate
532 .with_label_values(&labels.values())
533 .observe(v),
534 observe::MetricValue::OperationDurationSeconds(v) => self
535 .operation_duration_seconds
536 .with_label_values(&labels.values())
537 .observe(v.as_secs_f64()),
538 observe::MetricValue::OperationErrorsTotal => self
539 .operation_errors_total
540 .with_label_values(&labels.values())
541 .inc(),
542 observe::MetricValue::OperationExecuting(v) => self
543 .operation_executing
544 .with_label_values(&labels.values())
545 .add(v as i64),
546 observe::MetricValue::OperationTtfbSeconds(v) => self
547 .operation_ttfb_seconds
548 .with_label_values(&labels.values())
549 .observe(v.as_secs_f64()),
550
551 observe::MetricValue::HttpExecuting(v) => self
552 .http_executing
553 .with_label_values(&labels.values())
554 .add(v as i64),
555 observe::MetricValue::HttpRequestBytes(v) => self
556 .http_request_bytes
557 .with_label_values(&labels.values())
558 .observe(v as f64),
559 observe::MetricValue::HttpRequestBytesRate(v) => self
560 .http_request_bytes_rate
561 .with_label_values(&labels.values())
562 .observe(v),
563 observe::MetricValue::HttpRequestDurationSeconds(v) => self
564 .http_request_duration_seconds
565 .with_label_values(&labels.values())
566 .observe(v.as_secs_f64()),
567 observe::MetricValue::HttpResponseBytes(v) => self
568 .http_response_bytes
569 .with_label_values(&labels.values())
570 .observe(v as f64),
571 observe::MetricValue::HttpResponseBytesRate(v) => self
572 .http_response_bytes_rate
573 .with_label_values(&labels.values())
574 .observe(v),
575 observe::MetricValue::HttpResponseDurationSeconds(v) => self
576 .http_response_duration_seconds
577 .with_label_values(&labels.values())
578 .observe(v.as_secs_f64()),
579 observe::MetricValue::HttpConnectionErrorsTotal => self
580 .http_connection_errors_total
581 .with_label_values(&labels.values())
582 .inc(),
583 observe::MetricValue::HttpStatusErrorsTotal => self
584 .http_status_errors_total
585 .with_label_values(&labels.values())
586 .inc(),
587 }
588 }
589}
590
591struct OperationLabelNames(Vec<&'static str>);
592
593impl AsRef<[&'static str]> for OperationLabelNames {
594 fn as_ref(&self) -> &[&'static str] {
595 &self.0
596 }
597}
598
599impl OperationLabelNames {
600 fn with_error(mut self) -> Self {
601 self.0.push(observe::LABEL_ERROR);
602 self
603 }
604
605 fn with_status_code(mut self) -> Self {
606 self.0.push(observe::LABEL_STATUS_CODE);
607 self
608 }
609}
610
611#[derive(Clone, Debug, PartialEq, Eq, Hash)]
612struct OperationLabels(observe::MetricLabels);
613
614impl OperationLabels {
615 fn names() -> OperationLabelNames {
616 OperationLabelNames(vec![
617 observe::LABEL_SCHEME,
618 observe::LABEL_NAMESPACE,
619 observe::LABEL_ROOT,
620 observe::LABEL_OPERATION,
621 ])
622 }
623
624 fn values(&self) -> Vec<&str> {
625 let mut labels = Vec::with_capacity(6);
626
627 labels.extend([
628 self.0.scheme.into_static(),
629 self.0.namespace.as_ref(),
630 self.0.root.as_ref(),
631 self.0.operation,
632 ]);
633
634 if let Some(error) = self.0.error {
635 labels.push(error.into_static());
636 }
637
638 if let Some(status_code) = &self.0.status_code {
639 labels.push(status_code.as_str());
640 }
641
642 labels
643 }
644}