1use std::fmt;
19use std::time::Duration;
20
21use prometheus_client::encoding::EncodeLabel;
22use prometheus_client::encoding::EncodeLabelSet;
23use prometheus_client::encoding::LabelSetEncoder;
24use prometheus_client::metrics::counter::Counter;
25use prometheus_client::metrics::family::Family;
26use prometheus_client::metrics::family::MetricConstructor;
27use prometheus_client::metrics::gauge::Gauge;
28use prometheus_client::metrics::histogram::Histogram;
29use prometheus_client::registry::Metric;
30use prometheus_client::registry::Registry;
31use prometheus_client::registry::Unit;
32
33use crate::layers::observe;
34use crate::raw::*;
35use crate::*;
36
37#[derive(Clone, Debug)]
83pub struct PrometheusClientLayer {
84 interceptor: PrometheusClientInterceptor,
85}
86
87impl PrometheusClientLayer {
88 pub fn builder() -> PrometheusClientLayerBuilder {
90 PrometheusClientLayerBuilder::default()
91 }
92}
93
94impl<A: Access> Layer<A> for PrometheusClientLayer {
95 type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
96
97 fn layer(&self, inner: A) -> Self::LayeredAccess {
98 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
99 }
100}
101
102pub struct PrometheusClientLayerBuilder {
104 bytes_buckets: Vec<f64>,
105 bytes_rate_buckets: Vec<f64>,
106 entries_buckets: Vec<f64>,
107 entries_rate_buckets: Vec<f64>,
108 duration_seconds_buckets: Vec<f64>,
109 ttfb_buckets: Vec<f64>,
110 disable_label_root: bool,
111}
112
113impl Default for PrometheusClientLayerBuilder {
114 fn default() -> Self {
115 Self {
116 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
117 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
118 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
119 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
120 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
121 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
122 disable_label_root: false,
123 }
124 }
125}
126
127impl PrometheusClientLayerBuilder {
128 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
130 if !buckets.is_empty() {
131 self.bytes_buckets = buckets;
132 }
133 self
134 }
135
136 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
138 if !buckets.is_empty() {
139 self.bytes_rate_buckets = buckets;
140 }
141 self
142 }
143
144 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
146 if !buckets.is_empty() {
147 self.entries_buckets = buckets;
148 }
149 self
150 }
151
152 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
154 if !buckets.is_empty() {
155 self.entries_rate_buckets = buckets;
156 }
157 self
158 }
159
160 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
162 if !buckets.is_empty() {
163 self.duration_seconds_buckets = buckets;
164 }
165 self
166 }
167
168 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
170 if !buckets.is_empty() {
171 self.ttfb_buckets = buckets;
172 }
173 self
174 }
175
176 pub fn disable_label_root(mut self, disable: bool) -> Self {
179 self.disable_label_root = disable;
180 self
181 }
182
183 pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
209 let operation_bytes =
210 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
211 buckets: self.bytes_buckets.clone(),
212 });
213 let operation_bytes_rate =
214 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
215 buckets: self.bytes_rate_buckets.clone(),
216 });
217 let operation_entries =
218 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
219 buckets: self.entries_buckets.clone(),
220 });
221 let operation_entries_rate =
222 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
223 buckets: self.entries_rate_buckets.clone(),
224 });
225 let operation_duration_seconds =
226 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
227 buckets: self.duration_seconds_buckets.clone(),
228 });
229 let operation_errors_total = Family::<OperationLabels, Counter>::default();
230 let operation_executing = Family::<OperationLabels, Gauge>::default();
231 let operation_ttfb_seconds =
232 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
233 buckets: self.ttfb_buckets.clone(),
234 });
235
236 let http_executing = Family::<OperationLabels, Gauge>::default();
237 let http_request_bytes =
238 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
239 buckets: self.bytes_buckets.clone(),
240 });
241 let http_request_bytes_rate =
242 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
243 buckets: self.bytes_rate_buckets.clone(),
244 });
245 let http_request_duration_seconds =
246 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
247 buckets: self.duration_seconds_buckets.clone(),
248 });
249 let http_response_bytes =
250 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
251 buckets: self.bytes_buckets.clone(),
252 });
253 let http_response_bytes_rate =
254 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
255 buckets: self.bytes_rate_buckets.clone(),
256 });
257 let http_response_duration_seconds =
258 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
259 buckets: self.duration_seconds_buckets.clone(),
260 });
261 let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
262 let http_status_errors_total = Family::<OperationLabels, Counter>::default();
263
264 register_metric(
265 registry,
266 operation_bytes.clone(),
267 observe::MetricValue::OperationBytes(0),
268 );
269 register_metric(
270 registry,
271 operation_bytes_rate.clone(),
272 observe::MetricValue::OperationBytesRate(0.0),
273 );
274 register_metric(
275 registry,
276 operation_entries.clone(),
277 observe::MetricValue::OperationEntries(0),
278 );
279 register_metric(
280 registry,
281 operation_entries_rate.clone(),
282 observe::MetricValue::OperationEntriesRate(0.0),
283 );
284 register_metric(
285 registry,
286 operation_duration_seconds.clone(),
287 observe::MetricValue::OperationDurationSeconds(Duration::default()),
288 );
289 register_metric(
290 registry,
291 operation_errors_total.clone(),
292 observe::MetricValue::OperationErrorsTotal,
293 );
294 register_metric(
295 registry,
296 operation_executing.clone(),
297 observe::MetricValue::OperationExecuting(0),
298 );
299 register_metric(
300 registry,
301 operation_ttfb_seconds.clone(),
302 observe::MetricValue::OperationTtfbSeconds(Duration::default()),
303 );
304
305 register_metric(
306 registry,
307 http_executing.clone(),
308 observe::MetricValue::HttpExecuting(0),
309 );
310 register_metric(
311 registry,
312 http_request_bytes.clone(),
313 observe::MetricValue::HttpRequestBytes(0),
314 );
315 register_metric(
316 registry,
317 http_request_bytes_rate.clone(),
318 observe::MetricValue::HttpRequestBytesRate(0.0),
319 );
320 register_metric(
321 registry,
322 http_request_duration_seconds.clone(),
323 observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
324 );
325 register_metric(
326 registry,
327 http_response_bytes.clone(),
328 observe::MetricValue::HttpResponseBytes(0),
329 );
330 register_metric(
331 registry,
332 http_response_bytes_rate.clone(),
333 observe::MetricValue::HttpResponseBytesRate(0.0),
334 );
335 register_metric(
336 registry,
337 http_response_duration_seconds.clone(),
338 observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
339 );
340 register_metric(
341 registry,
342 http_connection_errors_total.clone(),
343 observe::MetricValue::HttpConnectionErrorsTotal,
344 );
345 register_metric(
346 registry,
347 http_status_errors_total.clone(),
348 observe::MetricValue::HttpStatusErrorsTotal,
349 );
350
351 PrometheusClientLayer {
352 interceptor: PrometheusClientInterceptor {
353 operation_bytes,
354 operation_bytes_rate,
355 operation_entries,
356 operation_entries_rate,
357 operation_duration_seconds,
358 operation_errors_total,
359 operation_executing,
360 operation_ttfb_seconds,
361
362 http_executing,
363 http_request_bytes,
364 http_request_bytes_rate,
365 http_request_duration_seconds,
366 http_response_bytes,
367 http_response_bytes_rate,
368 http_response_duration_seconds,
369 http_connection_errors_total,
370 http_status_errors_total,
371
372 disable_label_root: self.disable_label_root,
373 },
374 }
375 }
376}
377
378#[derive(Clone)]
379struct HistogramConstructor {
380 buckets: Vec<f64>,
381}
382
383impl MetricConstructor<Histogram> for HistogramConstructor {
384 fn new_metric(&self) -> Histogram {
385 Histogram::new(self.buckets.iter().cloned())
386 }
387}
388
389#[derive(Clone, Debug)]
390pub struct PrometheusClientInterceptor {
391 operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
392 operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
393 operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
394 operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
395 operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
396 operation_errors_total: Family<OperationLabels, Counter>,
397 operation_executing: Family<OperationLabels, Gauge>,
398 operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
399
400 http_executing: Family<OperationLabels, Gauge>,
401 http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
402 http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
403 http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
404 http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
405 http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
406 http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
407 http_connection_errors_total: Family<OperationLabels, Counter>,
408 http_status_errors_total: Family<OperationLabels, Counter>,
409
410 disable_label_root: bool,
411}
412
413impl observe::MetricsIntercept for PrometheusClientInterceptor {
414 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
415 let labels = OperationLabels {
416 labels,
417 disable_label_root: self.disable_label_root,
418 };
419 match value {
420 observe::MetricValue::OperationBytes(v) => self
421 .operation_bytes
422 .get_or_create(&labels)
423 .observe(v as f64),
424 observe::MetricValue::OperationBytesRate(v) => {
425 self.operation_bytes_rate.get_or_create(&labels).observe(v)
426 }
427 observe::MetricValue::OperationEntries(v) => self
428 .operation_entries
429 .get_or_create(&labels)
430 .observe(v as f64),
431 observe::MetricValue::OperationEntriesRate(v) => self
432 .operation_entries_rate
433 .get_or_create(&labels)
434 .observe(v),
435 observe::MetricValue::OperationDurationSeconds(v) => self
436 .operation_duration_seconds
437 .get_or_create(&labels)
438 .observe(v.as_secs_f64()),
439 observe::MetricValue::OperationErrorsTotal => {
440 self.operation_errors_total.get_or_create(&labels).inc();
441 }
442 observe::MetricValue::OperationExecuting(v) => {
443 self.operation_executing
444 .get_or_create(&labels)
445 .inc_by(v as i64);
446 }
447 observe::MetricValue::OperationTtfbSeconds(v) => self
448 .operation_ttfb_seconds
449 .get_or_create(&labels)
450 .observe(v.as_secs_f64()),
451
452 observe::MetricValue::HttpExecuting(v) => {
453 self.http_executing.get_or_create(&labels).inc_by(v as i64);
454 }
455 observe::MetricValue::HttpRequestBytes(v) => self
456 .http_request_bytes
457 .get_or_create(&labels)
458 .observe(v as f64),
459 observe::MetricValue::HttpRequestBytesRate(v) => self
460 .http_request_bytes_rate
461 .get_or_create(&labels)
462 .observe(v),
463 observe::MetricValue::HttpRequestDurationSeconds(v) => self
464 .http_request_duration_seconds
465 .get_or_create(&labels)
466 .observe(v.as_secs_f64()),
467 observe::MetricValue::HttpResponseBytes(v) => self
468 .http_response_bytes
469 .get_or_create(&labels)
470 .observe(v as f64),
471 observe::MetricValue::HttpResponseBytesRate(v) => self
472 .http_response_bytes_rate
473 .get_or_create(&labels)
474 .observe(v),
475 observe::MetricValue::HttpResponseDurationSeconds(v) => self
476 .http_response_duration_seconds
477 .get_or_create(&labels)
478 .observe(v.as_secs_f64()),
479 observe::MetricValue::HttpConnectionErrorsTotal => {
480 self.http_connection_errors_total
481 .get_or_create(&labels)
482 .inc();
483 }
484 observe::MetricValue::HttpStatusErrorsTotal => {
485 self.http_status_errors_total.get_or_create(&labels).inc();
486 }
487 };
488 }
489}
490
491#[derive(Clone, Debug, PartialEq, Eq, Hash)]
492struct OperationLabels {
493 labels: observe::MetricLabels,
494 disable_label_root: bool,
495}
496
497impl EncodeLabelSet for OperationLabels {
498 fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> {
499 (observe::LABEL_SCHEME, self.labels.scheme.into_static()).encode(encoder.encode_label())?;
500 (observe::LABEL_NAMESPACE, self.labels.namespace.as_ref())
501 .encode(encoder.encode_label())?;
502 if !self.disable_label_root {
503 (observe::LABEL_ROOT, self.labels.root.as_ref()).encode(encoder.encode_label())?;
504 }
505 (observe::LABEL_OPERATION, self.labels.operation).encode(encoder.encode_label())?;
506
507 if let Some(error) = &self.labels.error {
508 (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
509 }
510 if let Some(code) = &self.labels.status_code {
511 (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
512 }
513 Ok(())
514 }
515}
516
517fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
518 let ((name, unit), help) = (value.name_with_unit(), value.help());
519
520 if let Some(unit) = unit {
521 registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
522 } else {
523 registry.register(name, help, metric);
524 }
525}