1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::ready;
23use std::task::Context;
24use std::task::Poll;
25use std::time::Duration;
26use std::time::Instant;
27
28use futures::Stream;
29use futures::StreamExt;
30use http::StatusCode;
31
32use crate::raw::*;
33use crate::*;
34
35const KIB: f64 = 1024.0;
36const MIB: f64 = 1024.0 * KIB;
37const GIB: f64 = 1024.0 * MIB;
38
39pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
42 4.0 * KIB, 64.0 * KIB, 256.0 * KIB, 1.0 * MIB, 4.0 * MIB, 16.0 * MIB, 64.0 * MIB, 256.0 * MIB, 1.0 * GIB, 5.0 * GIB, ];
53
54pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
60 8.0 * KIB, 32.0 * KIB, 128.0 * KIB, 1.0 * MIB, 8.0 * MIB, 32.0 * MIB, 128.0 * MIB, 512.0 * MIB, 2.0 * GIB, 8.0 * GIB, 32.0 * GIB, ];
76
77pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
80 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
90
91pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
94 1.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
103
104pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
107 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, ];
120
121pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
124 0.001, 0.01, 0.025, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, ];
135
136pub static LABEL_SCHEME: &str = "scheme";
138pub static LABEL_NAMESPACE: &str = "namespace";
140pub static LABEL_ROOT: &str = "root";
142pub static LABEL_OPERATION: &str = "operation";
144pub static LABEL_ERROR: &str = "error";
146pub static LABEL_STATUS_CODE: &str = "status_code";
148
149#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
151pub struct MetricLabels {
152 pub scheme: Scheme,
155 pub namespace: Arc<str>,
158 pub root: Arc<str>,
161 pub operation: &'static str,
164 pub error: Option<ErrorKind>,
168 pub status_code: Option<StatusCode>,
172}
173
174impl MetricLabels {
175 fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
177 MetricLabels {
178 scheme: info.scheme(),
179 namespace: info.name(),
180 root: info.root(),
181 operation: op,
182 ..MetricLabels::default()
183 }
184 }
185
186 fn with_error(mut self, err: ErrorKind) -> Self {
188 self.error = Some(err);
189 self
190 }
191
192 fn with_status_code(mut self, code: StatusCode) -> Self {
194 self.status_code = Some(code);
195 self
196 }
197}
198
199#[non_exhaustive]
206#[derive(Debug, Clone, Copy)]
207pub enum MetricValue {
208 OperationBytes(u64),
211 OperationBytesRate(f64),
214 OperationEntries(u64),
217 OperationEntriesRate(f64),
220 OperationDurationSeconds(Duration),
223 OperationErrorsTotal,
226 OperationExecuting(isize),
229 OperationTtfbSeconds(Duration),
232 HttpExecuting(isize),
235 HttpRequestBytes(u64),
238 HttpRequestBytesRate(f64),
241 HttpRequestDurationSeconds(Duration),
244 HttpResponseBytes(u64),
247 HttpResponseBytesRate(f64),
250 HttpResponseDurationSeconds(Duration),
253 HttpConnectionErrorsTotal,
256 HttpStatusErrorsTotal,
259}
260
261impl MetricValue {
262 pub fn name(&self) -> &'static str {
264 match self {
265 MetricValue::OperationBytes(_) => "opendal_operation_bytes",
266 MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
267 MetricValue::OperationEntries(_) => "opendal_operation_entries",
268 MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
269 MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
270 MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
271 MetricValue::OperationExecuting(_) => "opendal_operation_executing",
272 MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
273
274 MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
275 MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
276 MetricValue::HttpExecuting(_) => "opendal_http_executing",
277 MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
278 MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
279 MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
280 MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
281 MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
282 MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
283 }
284 }
285
286 pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
292 match self {
293 MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
294 MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
295 MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
296 MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
297 MetricValue::OperationDurationSeconds(_) => {
298 ("opendal_operation_duration", Some("seconds"))
299 }
300 MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
301 MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
302 MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
303
304 MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
305 MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
306 MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
307 MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
308 MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
309 MetricValue::HttpRequestDurationSeconds(_) => {
310 ("opendal_http_request_duration", Some("seconds"))
311 }
312 MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
313 MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
314 MetricValue::HttpResponseDurationSeconds(_) => {
315 ("opendal_http_response_duration", Some("seconds"))
316 }
317 }
318 }
319
320 pub fn help(&self) -> &'static str {
322 match self {
323 MetricValue::OperationBytes(_) => "Current operation size in bytes, represents the size of data being processed in the current operation",
324 MetricValue::OperationBytesRate(_) => "Histogram of data processing rates in bytes per second within individual operations",
325 MetricValue::OperationEntries(_) => "Current operation size in entries, represents the entries being processed in the current operation",
326 MetricValue::OperationEntriesRate(_) => "Histogram of entries processing rates in entries per second within individual operations",
327 MetricValue::OperationDurationSeconds(_) => "Duration of operations in seconds, measured from start to completion",
328 MetricValue::OperationErrorsTotal => "Total number of failed operations",
329 MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
330 MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
331
332 MetricValue::HttpConnectionErrorsTotal => "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)",
333 MetricValue::HttpStatusErrorsTotal => "Total number of HTTP requests that received error status codes (non-2xx responses)",
334 MetricValue::HttpExecuting(_) => "Number of HTTP requests currently in flight from this client",
335 MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
336 MetricValue::HttpRequestBytesRate(_) => "Histogram of HTTP request bytes per second rates",
337 MetricValue::HttpRequestDurationSeconds(_) => "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte",
338 MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
339 MetricValue::HttpResponseBytesRate(_) => "Histogram of HTTP response bytes per second rates",
340 MetricValue::HttpResponseDurationSeconds(_) => "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received",
341 }
342 }
343}
344
345pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
349 fn observe(&self, labels: MetricLabels, value: MetricValue) {
351 let _ = (labels, value);
352 }
353}
354
355#[derive(Clone, Debug)]
357pub struct MetricsLayer<I: MetricsIntercept> {
358 interceptor: I,
359}
360
361impl<I: MetricsIntercept> MetricsLayer<I> {
362 pub fn new(interceptor: I) -> Self {
364 Self { interceptor }
365 }
366}
367
368impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
369 type LayeredAccess = MetricsAccessor<A, I>;
370
371 fn layer(&self, inner: A) -> Self::LayeredAccess {
372 let info = inner.info();
373
374 info.update_http_client(|client| {
376 HttpClient::with(MetricsHttpFetcher {
377 inner: client.into_inner(),
378 info: info.clone(),
379 interceptor: self.interceptor.clone(),
380 })
381 });
382
383 MetricsAccessor {
384 inner,
385 info,
386 interceptor: self.interceptor.clone(),
387 }
388 }
389}
390
391pub struct MetricsHttpFetcher<I: MetricsIntercept> {
393 inner: HttpFetcher,
394 info: Arc<AccessorInfo>,
395 interceptor: I,
396}
397
398impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
399 async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
400 let labels = MetricLabels::new(
401 self.info.clone(),
402 req.extensions()
403 .get::<Operation>()
404 .copied()
405 .map(Operation::into_static)
406 .unwrap_or("unknown"),
407 );
408
409 let start = Instant::now();
410 let req_size = req.body().len();
411
412 self.interceptor
413 .observe(labels.clone(), MetricValue::HttpExecuting(1));
414
415 let res = self.inner.fetch(req).await;
416 let req_duration = start.elapsed();
417
418 match res {
419 Err(err) => {
420 self.interceptor
421 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
422 self.interceptor
423 .observe(labels, MetricValue::HttpConnectionErrorsTotal);
424 Err(err)
425 }
426 Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
427 self.interceptor
428 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
429 self.interceptor.observe(
430 labels.with_status_code(resp.status()),
431 MetricValue::HttpStatusErrorsTotal,
432 );
433 Ok(resp)
434 }
435 Ok(resp) => {
436 self.interceptor.observe(
437 labels.clone(),
438 MetricValue::HttpRequestBytes(req_size as u64),
439 );
440 self.interceptor.observe(
441 labels.clone(),
442 MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
443 );
444 self.interceptor.observe(
445 labels.clone(),
446 MetricValue::HttpRequestDurationSeconds(req_duration),
447 );
448
449 let (parts, body) = resp.into_parts();
450 let body = body.map_inner(|s| {
451 Box::new(MetricsStream {
452 inner: s,
453 interceptor: self.interceptor.clone(),
454 labels: labels.clone(),
455 size: 0,
456 start: Instant::now(),
457 })
458 });
459
460 Ok(http::Response::from_parts(parts, body))
461 }
462 }
463 }
464}
465
466pub struct MetricsStream<S, I> {
467 inner: S,
468 interceptor: I,
469
470 labels: MetricLabels,
471 size: u64,
472 start: Instant,
473}
474
475impl<S, I> Stream for MetricsStream<S, I>
476where
477 S: Stream<Item = Result<Buffer>> + Unpin + 'static,
478 I: MetricsIntercept,
479{
480 type Item = Result<Buffer>;
481
482 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
483 match ready!(self.inner.poll_next_unpin(cx)) {
484 Some(Ok(bs)) => {
485 self.size += bs.len() as u64;
486 Poll::Ready(Some(Ok(bs)))
487 }
488 Some(Err(err)) => Poll::Ready(Some(Err(err))),
489 None => {
490 let resp_size = self.size;
491 let resp_duration = self.start.elapsed();
492
493 self.interceptor.observe(
494 self.labels.clone(),
495 MetricValue::HttpResponseBytes(resp_size),
496 );
497 self.interceptor.observe(
498 self.labels.clone(),
499 MetricValue::HttpResponseBytesRate(
500 resp_size as f64 / resp_duration.as_secs_f64(),
501 ),
502 );
503 self.interceptor.observe(
504 self.labels.clone(),
505 MetricValue::HttpResponseDurationSeconds(resp_duration),
506 );
507 self.interceptor
508 .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
509
510 Poll::Ready(None)
511 }
512 }
513 }
514}
515
516pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
518 inner: A,
519 info: Arc<AccessorInfo>,
520 interceptor: I,
521}
522
523impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
524 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525 f.debug_struct("MetricsAccessor")
526 .field("inner", &self.inner)
527 .finish_non_exhaustive()
528 }
529}
530
531impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
532 type Inner = A;
533 type Reader = MetricsWrapper<A::Reader, I>;
534 type BlockingReader = MetricsWrapper<A::BlockingReader, I>;
535 type Writer = MetricsWrapper<A::Writer, I>;
536 type BlockingWriter = MetricsWrapper<A::BlockingWriter, I>;
537 type Lister = MetricsWrapper<A::Lister, I>;
538 type BlockingLister = MetricsWrapper<A::BlockingLister, I>;
539 type Deleter = MetricsWrapper<A::Deleter, I>;
540 type BlockingDeleter = MetricsWrapper<A::BlockingDeleter, I>;
541
542 fn inner(&self) -> &Self::Inner {
543 &self.inner
544 }
545
546 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
547 let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
548
549 let start = Instant::now();
550
551 self.interceptor
552 .observe(labels.clone(), MetricValue::OperationExecuting(1));
553
554 let res = self
555 .inner()
556 .create_dir(path, args)
557 .await
558 .inspect(|_| {
559 self.interceptor.observe(
560 labels.clone(),
561 MetricValue::OperationDurationSeconds(start.elapsed()),
562 );
563 })
564 .inspect_err(|err| {
565 self.interceptor.observe(
566 labels.clone().with_error(err.kind()),
567 MetricValue::OperationErrorsTotal,
568 );
569 });
570
571 self.interceptor
572 .observe(labels, MetricValue::OperationExecuting(-1));
573 res
574 }
575
576 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
577 let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
578
579 let start = Instant::now();
580
581 self.interceptor
582 .observe(labels.clone(), MetricValue::OperationExecuting(1));
583
584 let (rp, reader) = self
585 .inner
586 .read(path, args)
587 .await
588 .inspect(|_| {
589 self.interceptor.observe(
590 labels.clone(),
591 MetricValue::OperationTtfbSeconds(start.elapsed()),
592 );
593 })
594 .inspect_err(|err| {
595 self.interceptor.observe(
596 labels.clone().with_error(err.kind()),
597 MetricValue::OperationErrorsTotal,
598 );
599 })?;
600
601 Ok((
602 rp,
603 MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
604 ))
605 }
606
607 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
608 let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
609
610 let start = Instant::now();
611
612 self.interceptor
613 .observe(labels.clone(), MetricValue::OperationExecuting(1));
614
615 let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
616 self.interceptor.observe(
617 labels.clone().with_error(err.kind()),
618 MetricValue::OperationErrorsTotal,
619 );
620 })?;
621
622 Ok((
623 rp,
624 MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
625 ))
626 }
627
628 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
629 let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
630
631 let start = Instant::now();
632
633 self.interceptor
634 .observe(labels.clone(), MetricValue::OperationExecuting(1));
635
636 let res = self
637 .inner()
638 .copy(from, to, args)
639 .await
640 .inspect(|_| {
641 self.interceptor.observe(
642 labels.clone(),
643 MetricValue::OperationDurationSeconds(start.elapsed()),
644 );
645 })
646 .inspect_err(|err| {
647 self.interceptor.observe(
648 labels.clone().with_error(err.kind()),
649 MetricValue::OperationErrorsTotal,
650 );
651 });
652
653 self.interceptor
654 .observe(labels, MetricValue::OperationExecuting(-1));
655 res
656 }
657
658 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
659 let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
660
661 let start = Instant::now();
662
663 self.interceptor
664 .observe(labels.clone(), MetricValue::OperationExecuting(1));
665
666 let res = self
667 .inner()
668 .rename(from, to, args)
669 .await
670 .inspect(|_| {
671 self.interceptor.observe(
672 labels.clone(),
673 MetricValue::OperationDurationSeconds(start.elapsed()),
674 );
675 })
676 .inspect_err(|err| {
677 self.interceptor.observe(
678 labels.clone().with_error(err.kind()),
679 MetricValue::OperationErrorsTotal,
680 );
681 });
682
683 self.interceptor
684 .observe(labels, MetricValue::OperationExecuting(-1));
685 res
686 }
687
688 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
689 let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
690
691 let start = Instant::now();
692
693 self.interceptor
694 .observe(labels.clone(), MetricValue::OperationExecuting(1));
695
696 let res = self
697 .inner()
698 .stat(path, args)
699 .await
700 .inspect(|_| {
701 self.interceptor.observe(
702 labels.clone(),
703 MetricValue::OperationDurationSeconds(start.elapsed()),
704 );
705 })
706 .inspect_err(|err| {
707 self.interceptor.observe(
708 labels.clone().with_error(err.kind()),
709 MetricValue::OperationErrorsTotal,
710 );
711 });
712
713 self.interceptor
714 .observe(labels, MetricValue::OperationExecuting(-1));
715 res
716 }
717
718 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
719 let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
720
721 let start = Instant::now();
722
723 self.interceptor
724 .observe(labels.clone(), MetricValue::OperationExecuting(1));
725
726 let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
727 self.interceptor.observe(
728 labels.clone().with_error(err.kind()),
729 MetricValue::OperationErrorsTotal,
730 );
731 })?;
732
733 Ok((
734 rp,
735 MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
736 ))
737 }
738
739 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
740 let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
741
742 let start = Instant::now();
743
744 self.interceptor
745 .observe(labels.clone(), MetricValue::OperationExecuting(1));
746
747 let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
748 self.interceptor.observe(
749 labels.clone().with_error(err.kind()),
750 MetricValue::OperationErrorsTotal,
751 );
752 })?;
753
754 Ok((
755 rp,
756 MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
757 ))
758 }
759
760 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
761 let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
762
763 let start = Instant::now();
764
765 self.interceptor
766 .observe(labels.clone(), MetricValue::OperationExecuting(1));
767
768 let res = self
769 .inner()
770 .presign(path, args)
771 .await
772 .inspect(|_| {
773 self.interceptor.observe(
774 labels.clone(),
775 MetricValue::OperationDurationSeconds(start.elapsed()),
776 );
777 })
778 .inspect_err(|err| {
779 self.interceptor.observe(
780 labels.clone().with_error(err.kind()),
781 MetricValue::OperationErrorsTotal,
782 );
783 });
784
785 self.interceptor
786 .observe(labels, MetricValue::OperationExecuting(-1));
787 res
788 }
789
790 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
791 let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
792
793 let start = Instant::now();
794
795 self.interceptor
796 .observe(labels.clone(), MetricValue::OperationExecuting(1));
797
798 let res = self
799 .inner()
800 .blocking_create_dir(path, args)
801 .inspect(|_| {
802 self.interceptor.observe(
803 labels.clone(),
804 MetricValue::OperationDurationSeconds(start.elapsed()),
805 );
806 })
807 .inspect_err(|err| {
808 self.interceptor.observe(
809 labels.clone().with_error(err.kind()),
810 MetricValue::OperationErrorsTotal,
811 );
812 });
813
814 self.interceptor
815 .observe(labels, MetricValue::OperationExecuting(-1));
816 res
817 }
818
819 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
820 let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
821
822 let start = Instant::now();
823
824 self.interceptor
825 .observe(labels.clone(), MetricValue::OperationExecuting(1));
826
827 let (rp, reader) = self
828 .inner
829 .blocking_read(path, args)
830 .inspect(|_| {
831 self.interceptor.observe(
832 labels.clone(),
833 MetricValue::OperationTtfbSeconds(start.elapsed()),
834 );
835 })
836 .inspect_err(|err| {
837 self.interceptor.observe(
838 labels.clone().with_error(err.kind()),
839 MetricValue::OperationErrorsTotal,
840 );
841 })?;
842
843 Ok((
844 rp,
845 MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
846 ))
847 }
848
849 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
850 let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
851
852 let start = Instant::now();
853
854 self.interceptor
855 .observe(labels.clone(), MetricValue::OperationExecuting(1));
856
857 let (rp, writer) = self.inner.blocking_write(path, args).inspect_err(|err| {
858 self.interceptor.observe(
859 labels.clone().with_error(err.kind()),
860 MetricValue::OperationErrorsTotal,
861 );
862 })?;
863
864 Ok((
865 rp,
866 MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
867 ))
868 }
869
870 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
871 let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
872
873 let start = Instant::now();
874
875 self.interceptor
876 .observe(labels.clone(), MetricValue::OperationExecuting(1));
877
878 let res = self
879 .inner()
880 .blocking_copy(from, to, args)
881 .inspect(|_| {
882 self.interceptor.observe(
883 labels.clone(),
884 MetricValue::OperationDurationSeconds(start.elapsed()),
885 );
886 })
887 .inspect_err(|err| {
888 self.interceptor.observe(
889 labels.clone().with_error(err.kind()),
890 MetricValue::OperationErrorsTotal,
891 );
892 });
893
894 self.interceptor
895 .observe(labels, MetricValue::OperationExecuting(-1));
896 res
897 }
898
899 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
900 let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
901
902 let start = Instant::now();
903
904 self.interceptor
905 .observe(labels.clone(), MetricValue::OperationExecuting(1));
906
907 let res = self
908 .inner()
909 .blocking_rename(from, to, args)
910 .inspect(|_| {
911 self.interceptor.observe(
912 labels.clone(),
913 MetricValue::OperationDurationSeconds(start.elapsed()),
914 );
915 })
916 .inspect_err(|err| {
917 self.interceptor.observe(
918 labels.clone().with_error(err.kind()),
919 MetricValue::OperationErrorsTotal,
920 );
921 });
922
923 self.interceptor
924 .observe(labels, MetricValue::OperationExecuting(-1));
925 res
926 }
927
928 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
929 let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
930
931 let start = Instant::now();
932
933 self.interceptor
934 .observe(labels.clone(), MetricValue::OperationExecuting(1));
935
936 let res = self
937 .inner()
938 .blocking_stat(path, args)
939 .inspect(|_| {
940 self.interceptor.observe(
941 labels.clone(),
942 MetricValue::OperationDurationSeconds(start.elapsed()),
943 );
944 })
945 .inspect_err(|err| {
946 self.interceptor.observe(
947 labels.clone().with_error(err.kind()),
948 MetricValue::OperationErrorsTotal,
949 );
950 });
951
952 self.interceptor
953 .observe(labels, MetricValue::OperationExecuting(-1));
954 res
955 }
956
957 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
958 let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
959
960 let start = Instant::now();
961
962 self.interceptor
963 .observe(labels.clone(), MetricValue::OperationExecuting(1));
964
965 let (rp, deleter) = self.inner.blocking_delete().inspect_err(|err| {
966 self.interceptor.observe(
967 labels.clone().with_error(err.kind()),
968 MetricValue::OperationErrorsTotal,
969 );
970 })?;
971
972 Ok((
973 rp,
974 MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
975 ))
976 }
977
978 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
979 let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
980
981 let start = Instant::now();
982
983 self.interceptor
984 .observe(labels.clone(), MetricValue::OperationExecuting(1));
985
986 let (rp, lister) = self.inner.blocking_list(path, args).inspect_err(|err| {
987 self.interceptor.observe(
988 labels.clone().with_error(err.kind()),
989 MetricValue::OperationErrorsTotal,
990 );
991 })?;
992
993 Ok((
994 rp,
995 MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
996 ))
997 }
998}
999
1000pub struct MetricsWrapper<R, I: MetricsIntercept> {
1001 inner: R,
1002 interceptor: I,
1003 labels: MetricLabels,
1004
1005 start: Instant,
1006 size: u64,
1007}
1008
1009impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
1010 fn drop(&mut self) {
1011 let size = self.size;
1012 let duration = self.start.elapsed();
1013
1014 if self.labels.operation == Operation::Read.into_static()
1015 || self.labels.operation == Operation::Write.into_static()
1016 {
1017 self.interceptor
1018 .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
1019 self.interceptor.observe(
1020 self.labels.clone(),
1021 MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
1022 );
1023 } else {
1024 self.interceptor.observe(
1025 self.labels.clone(),
1026 MetricValue::OperationEntries(self.size),
1027 );
1028 self.interceptor.observe(
1029 self.labels.clone(),
1030 MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
1031 );
1032 }
1033
1034 self.interceptor.observe(
1035 self.labels.clone(),
1036 MetricValue::OperationDurationSeconds(duration),
1037 );
1038 self.interceptor
1039 .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
1040 }
1041}
1042
1043impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
1044 fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
1045 Self {
1046 inner,
1047 interceptor,
1048 labels,
1049 start,
1050 size: 0,
1051 }
1052 }
1053}
1054
1055impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
1056 async fn read(&mut self) -> Result<Buffer> {
1057 self.inner
1058 .read()
1059 .await
1060 .inspect(|bs| {
1061 self.size += bs.len() as u64;
1062 })
1063 .inspect_err(|err| {
1064 self.interceptor.observe(
1065 self.labels.clone().with_error(err.kind()),
1066 MetricValue::OperationErrorsTotal,
1067 );
1068 })
1069 }
1070}
1071
1072impl<R: oio::BlockingRead, I: MetricsIntercept> oio::BlockingRead for MetricsWrapper<R, I> {
1073 fn read(&mut self) -> Result<Buffer> {
1074 self.inner
1075 .read()
1076 .inspect(|bs| {
1077 self.size += bs.len() as u64;
1078 })
1079 .inspect_err(|err| {
1080 self.interceptor.observe(
1081 self.labels.clone().with_error(err.kind()),
1082 MetricValue::OperationErrorsTotal,
1083 );
1084 })
1085 }
1086}
1087
1088impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
1089 async fn write(&mut self, bs: Buffer) -> Result<()> {
1090 let size = bs.len();
1091
1092 self.inner
1093 .write(bs)
1094 .await
1095 .inspect(|_| {
1096 self.size += size as u64;
1097 })
1098 .inspect_err(|err| {
1099 self.interceptor.observe(
1100 self.labels.clone().with_error(err.kind()),
1101 MetricValue::OperationErrorsTotal,
1102 );
1103 })
1104 }
1105
1106 async fn close(&mut self) -> Result<Metadata> {
1107 self.inner.close().await.inspect_err(|err| {
1108 self.interceptor.observe(
1109 self.labels.clone().with_error(err.kind()),
1110 MetricValue::OperationErrorsTotal,
1111 );
1112 })
1113 }
1114
1115 async fn abort(&mut self) -> Result<()> {
1116 self.inner.abort().await.inspect_err(|err| {
1117 self.interceptor.observe(
1118 self.labels.clone().with_error(err.kind()),
1119 MetricValue::OperationErrorsTotal,
1120 );
1121 })
1122 }
1123}
1124
1125impl<R: oio::BlockingWrite, I: MetricsIntercept> oio::BlockingWrite for MetricsWrapper<R, I> {
1126 fn write(&mut self, bs: Buffer) -> Result<()> {
1127 let size = bs.len();
1128
1129 self.inner
1130 .write(bs)
1131 .inspect(|_| {
1132 self.size += size as u64;
1133 })
1134 .inspect_err(|err| {
1135 self.interceptor.observe(
1136 self.labels.clone().with_error(err.kind()),
1137 MetricValue::OperationErrorsTotal,
1138 );
1139 })
1140 }
1141
1142 fn close(&mut self) -> Result<Metadata> {
1143 self.inner.close().inspect_err(|err| {
1144 self.interceptor.observe(
1145 self.labels.clone().with_error(err.kind()),
1146 MetricValue::OperationErrorsTotal,
1147 );
1148 })
1149 }
1150}
1151
1152impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
1153 async fn next(&mut self) -> Result<Option<oio::Entry>> {
1154 self.inner
1155 .next()
1156 .await
1157 .inspect(|_| {
1158 self.size += 1;
1159 })
1160 .inspect_err(|err| {
1161 self.interceptor.observe(
1162 self.labels.clone().with_error(err.kind()),
1163 MetricValue::OperationErrorsTotal,
1164 );
1165 })
1166 }
1167}
1168
1169impl<R: oio::BlockingList, I: MetricsIntercept> oio::BlockingList for MetricsWrapper<R, I> {
1170 fn next(&mut self) -> Result<Option<oio::Entry>> {
1171 self.inner
1172 .next()
1173 .inspect(|_| {
1174 self.size += 1;
1175 })
1176 .inspect_err(|err| {
1177 self.interceptor.observe(
1178 self.labels.clone().with_error(err.kind()),
1179 MetricValue::OperationErrorsTotal,
1180 );
1181 })
1182 }
1183}
1184
1185impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
1186 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1187 self.inner
1188 .delete(path, args)
1189 .inspect(|_| {
1190 self.size += 1;
1191 })
1192 .inspect_err(|err| {
1193 self.interceptor.observe(
1194 self.labels.clone().with_error(err.kind()),
1195 MetricValue::OperationErrorsTotal,
1196 );
1197 })
1198 }
1199
1200 async fn flush(&mut self) -> Result<usize> {
1201 self.inner.flush().await.inspect_err(|err| {
1202 self.interceptor.observe(
1203 self.labels.clone().with_error(err.kind()),
1204 MetricValue::OperationErrorsTotal,
1205 );
1206 })
1207 }
1208}
1209
1210impl<R: oio::BlockingDelete, I: MetricsIntercept> oio::BlockingDelete for MetricsWrapper<R, I> {
1211 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1212 self.inner
1213 .delete(path, args)
1214 .inspect(|_| {
1215 self.size += 1;
1216 })
1217 .inspect_err(|err| {
1218 self.interceptor.observe(
1219 self.labels.clone().with_error(err.kind()),
1220 MetricValue::OperationErrorsTotal,
1221 );
1222 })
1223 }
1224
1225 fn flush(&mut self) -> Result<usize> {
1226 self.inner.flush().inspect_err(|err| {
1227 self.interceptor.observe(
1228 self.labels.clone().with_error(err.kind()),
1229 MetricValue::OperationErrorsTotal,
1230 );
1231 })
1232 }
1233}