1use std::fmt;
19
20use prometheus_client::encoding::EncodeLabel;
21use prometheus_client::encoding::EncodeLabelSet;
22use prometheus_client::encoding::LabelSetEncoder;
23use prometheus_client::metrics::counter::Counter;
24use prometheus_client::metrics::family::Family;
25use prometheus_client::metrics::family::MetricConstructor;
26use prometheus_client::metrics::gauge::Gauge;
27use prometheus_client::metrics::histogram::Histogram;
28use prometheus_client::registry::Metric;
29use prometheus_client::registry::Registry;
30use prometheus_client::registry::Unit;
31
32use crate::layers::observe;
33use crate::raw::*;
34use crate::*;
35
36#[derive(Clone, Debug)]
79pub struct PrometheusClientLayer {
80 interceptor: PrometheusClientInterceptor,
81}
82
83impl PrometheusClientLayer {
84 pub fn builder() -> PrometheusClientLayerBuilder {
86 PrometheusClientLayerBuilder::default()
87 }
88}
89
90impl<A: Access> Layer<A> for PrometheusClientLayer {
91 type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
92
93 fn layer(&self, inner: A) -> Self::LayeredAccess {
94 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
95 }
96}
97
98pub struct PrometheusClientLayerBuilder {
100 bytes_buckets: Vec<f64>,
101 bytes_rate_buckets: Vec<f64>,
102 entries_buckets: Vec<f64>,
103 entries_rate_buckets: Vec<f64>,
104 duration_seconds_buckets: Vec<f64>,
105 ttfb_buckets: Vec<f64>,
106 disable_label_root: bool,
107}
108
109impl Default for PrometheusClientLayerBuilder {
110 fn default() -> Self {
111 Self {
112 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
113 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
114 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
115 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
116 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
117 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
118 disable_label_root: false,
119 }
120 }
121}
122
123impl PrometheusClientLayerBuilder {
124 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
126 if !buckets.is_empty() {
127 self.bytes_buckets = buckets;
128 }
129 self
130 }
131
132 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
134 if !buckets.is_empty() {
135 self.bytes_rate_buckets = buckets;
136 }
137 self
138 }
139
140 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
142 if !buckets.is_empty() {
143 self.entries_buckets = buckets;
144 }
145 self
146 }
147
148 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
150 if !buckets.is_empty() {
151 self.entries_rate_buckets = buckets;
152 }
153 self
154 }
155
156 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
158 if !buckets.is_empty() {
159 self.duration_seconds_buckets = buckets;
160 }
161 self
162 }
163
164 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
166 if !buckets.is_empty() {
167 self.ttfb_buckets = buckets;
168 }
169 self
170 }
171
172 pub fn disable_label_root(mut self, disable: bool) -> Self {
175 self.disable_label_root = disable;
176 self
177 }
178
179 pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
202 let operation_bytes =
203 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
204 buckets: self.bytes_buckets.clone(),
205 });
206 let operation_bytes_rate =
207 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
208 buckets: self.bytes_rate_buckets.clone(),
209 });
210 let operation_entries =
211 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
212 buckets: self.entries_buckets.clone(),
213 });
214 let operation_entries_rate =
215 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
216 buckets: self.entries_rate_buckets.clone(),
217 });
218 let operation_duration_seconds =
219 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
220 buckets: self.duration_seconds_buckets.clone(),
221 });
222 let operation_errors_total = Family::<OperationLabels, Counter>::default();
223 let operation_executing = Family::<OperationLabels, Gauge>::default();
224 let operation_ttfb_seconds =
225 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
226 buckets: self.ttfb_buckets.clone(),
227 });
228
229 let http_executing = Family::<OperationLabels, Gauge>::default();
230 let http_request_bytes =
231 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
232 buckets: self.bytes_buckets.clone(),
233 });
234 let http_request_bytes_rate =
235 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
236 buckets: self.bytes_rate_buckets.clone(),
237 });
238 let http_request_duration_seconds =
239 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
240 buckets: self.duration_seconds_buckets.clone(),
241 });
242 let http_response_bytes =
243 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
244 buckets: self.bytes_buckets.clone(),
245 });
246 let http_response_bytes_rate =
247 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
248 buckets: self.bytes_rate_buckets.clone(),
249 });
250 let http_response_duration_seconds =
251 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
252 buckets: self.duration_seconds_buckets.clone(),
253 });
254 let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
255 let http_status_errors_total = Family::<OperationLabels, Counter>::default();
256
257 register_metric(
258 registry,
259 operation_bytes.clone(),
260 observe::MetricValue::OperationBytes(0),
261 );
262 register_metric(
263 registry,
264 operation_bytes_rate.clone(),
265 observe::MetricValue::OperationBytesRate(0.0),
266 );
267 register_metric(
268 registry,
269 operation_entries.clone(),
270 observe::MetricValue::OperationEntries(0),
271 );
272 register_metric(
273 registry,
274 operation_entries_rate.clone(),
275 observe::MetricValue::OperationEntriesRate(0.0),
276 );
277 register_metric(
278 registry,
279 operation_duration_seconds.clone(),
280 observe::MetricValue::OperationDurationSeconds(Duration::default()),
281 );
282 register_metric(
283 registry,
284 operation_errors_total.clone(),
285 observe::MetricValue::OperationErrorsTotal,
286 );
287 register_metric(
288 registry,
289 operation_executing.clone(),
290 observe::MetricValue::OperationExecuting(0),
291 );
292 register_metric(
293 registry,
294 operation_ttfb_seconds.clone(),
295 observe::MetricValue::OperationTtfbSeconds(Duration::default()),
296 );
297
298 register_metric(
299 registry,
300 http_executing.clone(),
301 observe::MetricValue::HttpExecuting(0),
302 );
303 register_metric(
304 registry,
305 http_request_bytes.clone(),
306 observe::MetricValue::HttpRequestBytes(0),
307 );
308 register_metric(
309 registry,
310 http_request_bytes_rate.clone(),
311 observe::MetricValue::HttpRequestBytesRate(0.0),
312 );
313 register_metric(
314 registry,
315 http_request_duration_seconds.clone(),
316 observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
317 );
318 register_metric(
319 registry,
320 http_response_bytes.clone(),
321 observe::MetricValue::HttpResponseBytes(0),
322 );
323 register_metric(
324 registry,
325 http_response_bytes_rate.clone(),
326 observe::MetricValue::HttpResponseBytesRate(0.0),
327 );
328 register_metric(
329 registry,
330 http_response_duration_seconds.clone(),
331 observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
332 );
333 register_metric(
334 registry,
335 http_connection_errors_total.clone(),
336 observe::MetricValue::HttpConnectionErrorsTotal,
337 );
338 register_metric(
339 registry,
340 http_status_errors_total.clone(),
341 observe::MetricValue::HttpStatusErrorsTotal,
342 );
343
344 PrometheusClientLayer {
345 interceptor: PrometheusClientInterceptor {
346 operation_bytes,
347 operation_bytes_rate,
348 operation_entries,
349 operation_entries_rate,
350 operation_duration_seconds,
351 operation_errors_total,
352 operation_executing,
353 operation_ttfb_seconds,
354
355 http_executing,
356 http_request_bytes,
357 http_request_bytes_rate,
358 http_request_duration_seconds,
359 http_response_bytes,
360 http_response_bytes_rate,
361 http_response_duration_seconds,
362 http_connection_errors_total,
363 http_status_errors_total,
364
365 disable_label_root: self.disable_label_root,
366 },
367 }
368 }
369}
370
371#[derive(Clone)]
372struct HistogramConstructor {
373 buckets: Vec<f64>,
374}
375
376impl MetricConstructor<Histogram> for HistogramConstructor {
377 fn new_metric(&self) -> Histogram {
378 Histogram::new(self.buckets.iter().cloned())
379 }
380}
381
382#[derive(Clone, Debug)]
383pub struct PrometheusClientInterceptor {
384 operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
385 operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
386 operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
387 operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
388 operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
389 operation_errors_total: Family<OperationLabels, Counter>,
390 operation_executing: Family<OperationLabels, Gauge>,
391 operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
392
393 http_executing: Family<OperationLabels, Gauge>,
394 http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
395 http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
396 http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
397 http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
398 http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
399 http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
400 http_connection_errors_total: Family<OperationLabels, Counter>,
401 http_status_errors_total: Family<OperationLabels, Counter>,
402
403 disable_label_root: bool,
404}
405
406impl observe::MetricsIntercept for PrometheusClientInterceptor {
407 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
408 let labels = OperationLabels {
409 labels,
410 disable_label_root: self.disable_label_root,
411 };
412 match value {
413 observe::MetricValue::OperationBytes(v) => self
414 .operation_bytes
415 .get_or_create(&labels)
416 .observe(v as f64),
417 observe::MetricValue::OperationBytesRate(v) => {
418 self.operation_bytes_rate.get_or_create(&labels).observe(v)
419 }
420 observe::MetricValue::OperationEntries(v) => self
421 .operation_entries
422 .get_or_create(&labels)
423 .observe(v as f64),
424 observe::MetricValue::OperationEntriesRate(v) => self
425 .operation_entries_rate
426 .get_or_create(&labels)
427 .observe(v),
428 observe::MetricValue::OperationDurationSeconds(v) => self
429 .operation_duration_seconds
430 .get_or_create(&labels)
431 .observe(v.as_secs_f64()),
432 observe::MetricValue::OperationErrorsTotal => {
433 self.operation_errors_total.get_or_create(&labels).inc();
434 }
435 observe::MetricValue::OperationExecuting(v) => {
436 self.operation_executing
437 .get_or_create(&labels)
438 .inc_by(v as i64);
439 }
440 observe::MetricValue::OperationTtfbSeconds(v) => self
441 .operation_ttfb_seconds
442 .get_or_create(&labels)
443 .observe(v.as_secs_f64()),
444
445 observe::MetricValue::HttpExecuting(v) => {
446 self.http_executing.get_or_create(&labels).inc_by(v as i64);
447 }
448 observe::MetricValue::HttpRequestBytes(v) => self
449 .http_request_bytes
450 .get_or_create(&labels)
451 .observe(v as f64),
452 observe::MetricValue::HttpRequestBytesRate(v) => self
453 .http_request_bytes_rate
454 .get_or_create(&labels)
455 .observe(v),
456 observe::MetricValue::HttpRequestDurationSeconds(v) => self
457 .http_request_duration_seconds
458 .get_or_create(&labels)
459 .observe(v.as_secs_f64()),
460 observe::MetricValue::HttpResponseBytes(v) => self
461 .http_response_bytes
462 .get_or_create(&labels)
463 .observe(v as f64),
464 observe::MetricValue::HttpResponseBytesRate(v) => self
465 .http_response_bytes_rate
466 .get_or_create(&labels)
467 .observe(v),
468 observe::MetricValue::HttpResponseDurationSeconds(v) => self
469 .http_response_duration_seconds
470 .get_or_create(&labels)
471 .observe(v.as_secs_f64()),
472 observe::MetricValue::HttpConnectionErrorsTotal => {
473 self.http_connection_errors_total
474 .get_or_create(&labels)
475 .inc();
476 }
477 observe::MetricValue::HttpStatusErrorsTotal => {
478 self.http_status_errors_total.get_or_create(&labels).inc();
479 }
480 };
481 }
482}
483
484#[derive(Clone, Debug, PartialEq, Eq, Hash)]
485struct OperationLabels {
486 labels: observe::MetricLabels,
487 disable_label_root: bool,
488}
489
490impl EncodeLabelSet for OperationLabels {
491 fn encode(&self, encoder: &mut LabelSetEncoder<'_>) -> Result<(), fmt::Error> {
492 (observe::LABEL_SCHEME, self.labels.scheme).encode(encoder.encode_label())?;
493 (observe::LABEL_NAMESPACE, self.labels.namespace.as_ref())
494 .encode(encoder.encode_label())?;
495 if !self.disable_label_root {
496 (observe::LABEL_ROOT, self.labels.root.as_ref()).encode(encoder.encode_label())?;
497 }
498 (observe::LABEL_OPERATION, self.labels.operation).encode(encoder.encode_label())?;
499
500 if let Some(error) = &self.labels.error {
501 (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
502 }
503 if let Some(code) = &self.labels.status_code {
504 (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
505 }
506 Ok(())
507 }
508}
509
510fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
511 let ((name, unit), help) = (value.name_with_unit(), value.help());
512
513 if let Some(unit) = unit {
514 registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
515 } else {
516 registry.register(name, help, metric);
517 }
518}