1use std::fmt;
19use std::time::Duration;
20
21use prometheus_client::encoding::EncodeLabel;
22use prometheus_client::encoding::EncodeLabelSet;
23use prometheus_client::encoding::LabelSetEncoder;
24use prometheus_client::metrics::counter::Counter;
25use prometheus_client::metrics::family::Family;
26use prometheus_client::metrics::family::MetricConstructor;
27use prometheus_client::metrics::gauge::Gauge;
28use prometheus_client::metrics::histogram::Histogram;
29use prometheus_client::registry::Metric;
30use prometheus_client::registry::Registry;
31use prometheus_client::registry::Unit;
32
33use crate::layers::observe;
34use crate::raw::*;
35use crate::*;
36
37#[derive(Clone, Debug)]
83pub struct PrometheusClientLayer {
84 interceptor: PrometheusClientInterceptor,
85}
86
87impl PrometheusClientLayer {
88 pub fn builder() -> PrometheusClientLayerBuilder {
90 PrometheusClientLayerBuilder::default()
91 }
92}
93
94impl<A: Access> Layer<A> for PrometheusClientLayer {
95 type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
96
97 fn layer(&self, inner: A) -> Self::LayeredAccess {
98 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
99 }
100}
101
102pub struct PrometheusClientLayerBuilder {
104 bytes_buckets: Vec<f64>,
105 bytes_rate_buckets: Vec<f64>,
106 entries_buckets: Vec<f64>,
107 entries_rate_buckets: Vec<f64>,
108 duration_seconds_buckets: Vec<f64>,
109 ttfb_buckets: Vec<f64>,
110}
111
112impl Default for PrometheusClientLayerBuilder {
113 fn default() -> Self {
114 Self {
115 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
116 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
117 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
118 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
119 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
120 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
121 }
122 }
123}
124
125impl PrometheusClientLayerBuilder {
126 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
128 if !buckets.is_empty() {
129 self.bytes_buckets = buckets;
130 }
131 self
132 }
133
134 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
136 if !buckets.is_empty() {
137 self.bytes_rate_buckets = buckets;
138 }
139 self
140 }
141
142 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
144 if !buckets.is_empty() {
145 self.entries_buckets = buckets;
146 }
147 self
148 }
149
150 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
152 if !buckets.is_empty() {
153 self.entries_rate_buckets = buckets;
154 }
155 self
156 }
157
158 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
160 if !buckets.is_empty() {
161 self.duration_seconds_buckets = buckets;
162 }
163 self
164 }
165
166 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
168 if !buckets.is_empty() {
169 self.ttfb_buckets = buckets;
170 }
171 self
172 }
173
174 pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
200 let operation_bytes =
201 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
202 buckets: self.bytes_buckets.clone(),
203 });
204 let operation_bytes_rate =
205 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
206 buckets: self.bytes_rate_buckets.clone(),
207 });
208 let operation_entries =
209 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
210 buckets: self.entries_buckets.clone(),
211 });
212 let operation_entries_rate =
213 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
214 buckets: self.entries_rate_buckets.clone(),
215 });
216 let operation_duration_seconds =
217 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
218 buckets: self.duration_seconds_buckets.clone(),
219 });
220 let operation_errors_total = Family::<OperationLabels, Counter>::default();
221 let operation_executing = Family::<OperationLabels, Gauge>::default();
222 let operation_ttfb_seconds =
223 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
224 buckets: self.ttfb_buckets.clone(),
225 });
226
227 let http_executing = Family::<OperationLabels, Gauge>::default();
228 let http_request_bytes =
229 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
230 buckets: self.bytes_buckets.clone(),
231 });
232 let http_request_bytes_rate =
233 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
234 buckets: self.bytes_rate_buckets.clone(),
235 });
236 let http_request_duration_seconds =
237 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
238 buckets: self.duration_seconds_buckets.clone(),
239 });
240 let http_response_bytes =
241 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
242 buckets: self.bytes_buckets.clone(),
243 });
244 let http_response_bytes_rate =
245 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
246 buckets: self.bytes_rate_buckets.clone(),
247 });
248 let http_response_duration_seconds =
249 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
250 buckets: self.duration_seconds_buckets.clone(),
251 });
252 let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
253 let http_status_errors_total = Family::<OperationLabels, Counter>::default();
254
255 register_metric(
256 registry,
257 operation_bytes.clone(),
258 observe::MetricValue::OperationBytes(0),
259 );
260 register_metric(
261 registry,
262 operation_bytes_rate.clone(),
263 observe::MetricValue::OperationBytesRate(0.0),
264 );
265 register_metric(
266 registry,
267 operation_entries.clone(),
268 observe::MetricValue::OperationEntries(0),
269 );
270 register_metric(
271 registry,
272 operation_entries_rate.clone(),
273 observe::MetricValue::OperationEntriesRate(0.0),
274 );
275 register_metric(
276 registry,
277 operation_duration_seconds.clone(),
278 observe::MetricValue::OperationDurationSeconds(Duration::default()),
279 );
280 register_metric(
281 registry,
282 operation_errors_total.clone(),
283 observe::MetricValue::OperationErrorsTotal,
284 );
285 register_metric(
286 registry,
287 operation_executing.clone(),
288 observe::MetricValue::OperationExecuting(0),
289 );
290 register_metric(
291 registry,
292 operation_ttfb_seconds.clone(),
293 observe::MetricValue::OperationTtfbSeconds(Duration::default()),
294 );
295
296 register_metric(
297 registry,
298 http_executing.clone(),
299 observe::MetricValue::HttpExecuting(0),
300 );
301 register_metric(
302 registry,
303 http_request_bytes.clone(),
304 observe::MetricValue::HttpRequestBytes(0),
305 );
306 register_metric(
307 registry,
308 http_request_bytes_rate.clone(),
309 observe::MetricValue::HttpRequestBytesRate(0.0),
310 );
311 register_metric(
312 registry,
313 http_request_duration_seconds.clone(),
314 observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
315 );
316 register_metric(
317 registry,
318 http_response_bytes.clone(),
319 observe::MetricValue::HttpResponseBytes(0),
320 );
321 register_metric(
322 registry,
323 http_response_bytes_rate.clone(),
324 observe::MetricValue::HttpResponseBytesRate(0.0),
325 );
326 register_metric(
327 registry,
328 http_response_duration_seconds.clone(),
329 observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
330 );
331 register_metric(
332 registry,
333 http_connection_errors_total.clone(),
334 observe::MetricValue::HttpConnectionErrorsTotal,
335 );
336 register_metric(
337 registry,
338 http_status_errors_total.clone(),
339 observe::MetricValue::HttpStatusErrorsTotal,
340 );
341
342 PrometheusClientLayer {
343 interceptor: PrometheusClientInterceptor {
344 operation_bytes,
345 operation_bytes_rate,
346 operation_entries,
347 operation_entries_rate,
348 operation_duration_seconds,
349 operation_errors_total,
350 operation_executing,
351 operation_ttfb_seconds,
352
353 http_executing,
354 http_request_bytes,
355 http_request_bytes_rate,
356 http_request_duration_seconds,
357 http_response_bytes,
358 http_response_bytes_rate,
359 http_response_duration_seconds,
360 http_connection_errors_total,
361 http_status_errors_total,
362 },
363 }
364 }
365}
366
367#[derive(Clone)]
368struct HistogramConstructor {
369 buckets: Vec<f64>,
370}
371
372impl MetricConstructor<Histogram> for HistogramConstructor {
373 fn new_metric(&self) -> Histogram {
374 Histogram::new(self.buckets.iter().cloned())
375 }
376}
377
378#[derive(Clone, Debug)]
379pub struct PrometheusClientInterceptor {
380 operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
381 operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
382 operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
383 operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
384 operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
385 operation_errors_total: Family<OperationLabels, Counter>,
386 operation_executing: Family<OperationLabels, Gauge>,
387 operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
388
389 http_executing: Family<OperationLabels, Gauge>,
390 http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
391 http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
392 http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
393 http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
394 http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
395 http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
396 http_connection_errors_total: Family<OperationLabels, Counter>,
397 http_status_errors_total: Family<OperationLabels, Counter>,
398}
399
400impl observe::MetricsIntercept for PrometheusClientInterceptor {
401 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
402 let labels = OperationLabels(labels);
403 match value {
404 observe::MetricValue::OperationBytes(v) => self
405 .operation_bytes
406 .get_or_create(&labels)
407 .observe(v as f64),
408 observe::MetricValue::OperationBytesRate(v) => {
409 self.operation_bytes_rate.get_or_create(&labels).observe(v)
410 }
411 observe::MetricValue::OperationEntries(v) => self
412 .operation_entries
413 .get_or_create(&labels)
414 .observe(v as f64),
415 observe::MetricValue::OperationEntriesRate(v) => self
416 .operation_entries_rate
417 .get_or_create(&labels)
418 .observe(v),
419 observe::MetricValue::OperationDurationSeconds(v) => self
420 .operation_duration_seconds
421 .get_or_create(&labels)
422 .observe(v.as_secs_f64()),
423 observe::MetricValue::OperationErrorsTotal => {
424 self.operation_errors_total.get_or_create(&labels).inc();
425 }
426 observe::MetricValue::OperationExecuting(v) => {
427 self.operation_executing
428 .get_or_create(&labels)
429 .inc_by(v as i64);
430 }
431 observe::MetricValue::OperationTtfbSeconds(v) => self
432 .operation_ttfb_seconds
433 .get_or_create(&labels)
434 .observe(v.as_secs_f64()),
435
436 observe::MetricValue::HttpExecuting(v) => {
437 self.http_executing.get_or_create(&labels).inc_by(v as i64);
438 }
439 observe::MetricValue::HttpRequestBytes(v) => self
440 .http_request_bytes
441 .get_or_create(&labels)
442 .observe(v as f64),
443 observe::MetricValue::HttpRequestBytesRate(v) => self
444 .http_request_bytes_rate
445 .get_or_create(&labels)
446 .observe(v),
447 observe::MetricValue::HttpRequestDurationSeconds(v) => self
448 .http_request_duration_seconds
449 .get_or_create(&labels)
450 .observe(v.as_secs_f64()),
451 observe::MetricValue::HttpResponseBytes(v) => self
452 .http_response_bytes
453 .get_or_create(&labels)
454 .observe(v as f64),
455 observe::MetricValue::HttpResponseBytesRate(v) => self
456 .http_response_bytes_rate
457 .get_or_create(&labels)
458 .observe(v),
459 observe::MetricValue::HttpResponseDurationSeconds(v) => self
460 .http_response_duration_seconds
461 .get_or_create(&labels)
462 .observe(v.as_secs_f64()),
463 observe::MetricValue::HttpConnectionErrorsTotal => {
464 self.http_connection_errors_total
465 .get_or_create(&labels)
466 .inc();
467 }
468 observe::MetricValue::HttpStatusErrorsTotal => {
469 self.http_status_errors_total.get_or_create(&labels).inc();
470 }
471 };
472 }
473}
474
475#[derive(Clone, Debug, PartialEq, Eq, Hash)]
476struct OperationLabels(observe::MetricLabels);
477
478impl EncodeLabelSet for OperationLabels {
479 fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> {
480 (observe::LABEL_SCHEME, self.0.scheme.into_static()).encode(encoder.encode_label())?;
481 (observe::LABEL_NAMESPACE, self.0.namespace.as_ref()).encode(encoder.encode_label())?;
482 (observe::LABEL_ROOT, self.0.root.as_ref()).encode(encoder.encode_label())?;
483 (observe::LABEL_OPERATION, self.0.operation).encode(encoder.encode_label())?;
484
485 if let Some(error) = &self.0.error {
486 (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
487 }
488 if let Some(code) = &self.0.status_code {
489 (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
490 }
491 Ok(())
492 }
493}
494
495fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
496 let ((name, unit), help) = (value.name_with_unit(), value.help());
497
498 if let Some(unit) = unit {
499 registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
500 } else {
501 registry.register(name, help, metric);
502 }
503}