1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::time::Duration;
26use std::time::Instant;
27
28use futures::Stream;
29use futures::StreamExt;
30use http::StatusCode;
31
32use crate::raw::*;
33use crate::*;
34
35const KIB: f64 = 1024.0;
36const MIB: f64 = 1024.0 * KIB;
37const GIB: f64 = 1024.0 * MIB;
38
39pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
42 4.0 * KIB, 64.0 * KIB, 256.0 * KIB, 1.0 * MIB, 4.0 * MIB, 16.0 * MIB, 64.0 * MIB, 256.0 * MIB, 1.0 * GIB, 5.0 * GIB, ];
53
54pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
60 8.0 * KIB, 32.0 * KIB, 128.0 * KIB, 1.0 * MIB, 8.0 * MIB, 32.0 * MIB, 128.0 * MIB, 512.0 * MIB, 2.0 * GIB, 8.0 * GIB, 32.0 * GIB, ];
76
77pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
80 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
90
91pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
94 1.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
103
104pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
107 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, ];
120
121pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
124 0.001, 0.01, 0.025, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, ];
135
136pub static LABEL_SCHEME: &str = "scheme";
138pub static LABEL_NAMESPACE: &str = "namespace";
140pub static LABEL_ROOT: &str = "root";
142pub static LABEL_OPERATION: &str = "operation";
144pub static LABEL_ERROR: &str = "error";
146pub static LABEL_STATUS_CODE: &str = "status_code";
148
149#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
151pub struct MetricLabels {
152 pub scheme: &'static str,
155 pub namespace: Arc<str>,
158 pub root: Arc<str>,
161 pub operation: &'static str,
164 pub error: Option<ErrorKind>,
168 pub status_code: Option<StatusCode>,
172}
173
174impl MetricLabels {
175 fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
177 MetricLabels {
178 scheme: info.scheme(),
179 namespace: info.name(),
180 root: info.root(),
181 operation: op,
182 ..MetricLabels::default()
183 }
184 }
185
186 fn with_error(mut self, err: ErrorKind) -> Self {
188 self.error = Some(err);
189 self
190 }
191
192 fn with_status_code(mut self, code: StatusCode) -> Self {
194 self.status_code = Some(code);
195 self
196 }
197}
198
199#[non_exhaustive]
206#[derive(Debug, Clone, Copy)]
207pub enum MetricValue {
208 OperationBytes(u64),
211 OperationBytesRate(f64),
214 OperationEntries(u64),
217 OperationEntriesRate(f64),
220 OperationDurationSeconds(Duration),
223 OperationErrorsTotal,
226 OperationExecuting(isize),
229 OperationTtfbSeconds(Duration),
232 HttpExecuting(isize),
235 HttpRequestBytes(u64),
238 HttpRequestBytesRate(f64),
241 HttpRequestDurationSeconds(Duration),
244 HttpResponseBytes(u64),
247 HttpResponseBytesRate(f64),
250 HttpResponseDurationSeconds(Duration),
253 HttpConnectionErrorsTotal,
256 HttpStatusErrorsTotal,
259}
260
261impl MetricValue {
262 pub fn name(&self) -> &'static str {
264 match self {
265 MetricValue::OperationBytes(_) => "opendal_operation_bytes",
266 MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
267 MetricValue::OperationEntries(_) => "opendal_operation_entries",
268 MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
269 MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
270 MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
271 MetricValue::OperationExecuting(_) => "opendal_operation_executing",
272 MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
273
274 MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
275 MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
276 MetricValue::HttpExecuting(_) => "opendal_http_executing",
277 MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
278 MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
279 MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
280 MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
281 MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
282 MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
283 }
284 }
285
286 pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
292 match self {
293 MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
294 MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
295 MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
296 MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
297 MetricValue::OperationDurationSeconds(_) => {
298 ("opendal_operation_duration", Some("seconds"))
299 }
300 MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
301 MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
302 MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
303
304 MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
305 MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
306 MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
307 MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
308 MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
309 MetricValue::HttpRequestDurationSeconds(_) => {
310 ("opendal_http_request_duration", Some("seconds"))
311 }
312 MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
313 MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
314 MetricValue::HttpResponseDurationSeconds(_) => {
315 ("opendal_http_response_duration", Some("seconds"))
316 }
317 }
318 }
319
320 pub fn help(&self) -> &'static str {
322 match self {
323 MetricValue::OperationBytes(_) => {
324 "Current operation size in bytes, represents the size of data being processed in the current operation"
325 }
326 MetricValue::OperationBytesRate(_) => {
327 "Histogram of data processing rates in bytes per second within individual operations"
328 }
329 MetricValue::OperationEntries(_) => {
330 "Current operation size in entries, represents the entries being processed in the current operation"
331 }
332 MetricValue::OperationEntriesRate(_) => {
333 "Histogram of entries processing rates in entries per second within individual operations"
334 }
335 MetricValue::OperationDurationSeconds(_) => {
336 "Duration of operations in seconds, measured from start to completion"
337 }
338 MetricValue::OperationErrorsTotal => "Total number of failed operations",
339 MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
340 MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
341
342 MetricValue::HttpConnectionErrorsTotal => {
343 "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)"
344 }
345 MetricValue::HttpStatusErrorsTotal => {
346 "Total number of HTTP requests that received error status codes (non-2xx responses)"
347 }
348 MetricValue::HttpExecuting(_) => {
349 "Number of HTTP requests currently in flight from this client"
350 }
351 MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
352 MetricValue::HttpRequestBytesRate(_) => {
353 "Histogram of HTTP request bytes per second rates"
354 }
355 MetricValue::HttpRequestDurationSeconds(_) => {
356 "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte"
357 }
358 MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
359 MetricValue::HttpResponseBytesRate(_) => {
360 "Histogram of HTTP response bytes per second rates"
361 }
362 MetricValue::HttpResponseDurationSeconds(_) => {
363 "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received"
364 }
365 }
366 }
367}
368
369pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
373 fn observe(&self, labels: MetricLabels, value: MetricValue) {
375 let _ = (labels, value);
376 }
377}
378
379#[derive(Clone, Debug)]
381pub struct MetricsLayer<I: MetricsIntercept> {
382 interceptor: I,
383}
384
385impl<I: MetricsIntercept> MetricsLayer<I> {
386 pub fn new(interceptor: I) -> Self {
388 Self { interceptor }
389 }
390}
391
392impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
393 type LayeredAccess = MetricsAccessor<A, I>;
394
395 fn layer(&self, inner: A) -> Self::LayeredAccess {
396 let info = inner.info();
397
398 info.update_http_client(|client| {
400 HttpClient::with(MetricsHttpFetcher {
401 inner: client.into_inner(),
402 info: info.clone(),
403 interceptor: self.interceptor.clone(),
404 })
405 });
406
407 MetricsAccessor {
408 inner,
409 info,
410 interceptor: self.interceptor.clone(),
411 }
412 }
413}
414
415pub struct MetricsHttpFetcher<I: MetricsIntercept> {
417 inner: HttpFetcher,
418 info: Arc<AccessorInfo>,
419 interceptor: I,
420}
421
422impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
423 async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
424 let labels = MetricLabels::new(
425 self.info.clone(),
426 req.extensions()
427 .get::<Operation>()
428 .copied()
429 .map(Operation::into_static)
430 .unwrap_or("unknown"),
431 );
432
433 let start = Instant::now();
434 let req_size = req.body().len();
435
436 self.interceptor
437 .observe(labels.clone(), MetricValue::HttpExecuting(1));
438
439 let res = self.inner.fetch(req).await;
440 let req_duration = start.elapsed();
441
442 match res {
443 Err(err) => {
444 self.interceptor
445 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
446 self.interceptor
447 .observe(labels, MetricValue::HttpConnectionErrorsTotal);
448 Err(err)
449 }
450 Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
451 self.interceptor
452 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
453 self.interceptor.observe(
454 labels.with_status_code(resp.status()),
455 MetricValue::HttpStatusErrorsTotal,
456 );
457 Ok(resp)
458 }
459 Ok(resp) => {
460 self.interceptor.observe(
461 labels.clone(),
462 MetricValue::HttpRequestBytes(req_size as u64),
463 );
464 self.interceptor.observe(
465 labels.clone(),
466 MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
467 );
468 self.interceptor.observe(
469 labels.clone(),
470 MetricValue::HttpRequestDurationSeconds(req_duration),
471 );
472
473 let (parts, body) = resp.into_parts();
474 let body = body.map_inner(|s| {
475 Box::new(MetricsStream {
476 inner: s,
477 interceptor: self.interceptor.clone(),
478 labels: labels.clone(),
479 size: 0,
480 start: Instant::now(),
481 })
482 });
483
484 Ok(http::Response::from_parts(parts, body))
485 }
486 }
487 }
488}
489
490pub struct MetricsStream<S, I> {
491 inner: S,
492 interceptor: I,
493
494 labels: MetricLabels,
495 size: u64,
496 start: Instant,
497}
498
499impl<S, I> Stream for MetricsStream<S, I>
500where
501 S: Stream<Item = Result<Buffer>> + Unpin + 'static,
502 I: MetricsIntercept,
503{
504 type Item = Result<Buffer>;
505
506 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
507 match ready!(self.inner.poll_next_unpin(cx)) {
508 Some(Ok(bs)) => {
509 self.size += bs.len() as u64;
510 Poll::Ready(Some(Ok(bs)))
511 }
512 Some(Err(err)) => Poll::Ready(Some(Err(err))),
513 None => {
514 let resp_size = self.size;
515 let resp_duration = self.start.elapsed();
516
517 self.interceptor.observe(
518 self.labels.clone(),
519 MetricValue::HttpResponseBytes(resp_size),
520 );
521 self.interceptor.observe(
522 self.labels.clone(),
523 MetricValue::HttpResponseBytesRate(
524 resp_size as f64 / resp_duration.as_secs_f64(),
525 ),
526 );
527 self.interceptor.observe(
528 self.labels.clone(),
529 MetricValue::HttpResponseDurationSeconds(resp_duration),
530 );
531 self.interceptor
532 .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
533
534 Poll::Ready(None)
535 }
536 }
537 }
538}
539
540pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
542 inner: A,
543 info: Arc<AccessorInfo>,
544 interceptor: I,
545}
546
547impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
548 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
549 f.debug_struct("MetricsAccessor")
550 .field("inner", &self.inner)
551 .finish_non_exhaustive()
552 }
553}
554
555impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
556 type Inner = A;
557 type Reader = MetricsWrapper<A::Reader, I>;
558 type Writer = MetricsWrapper<A::Writer, I>;
559 type Lister = MetricsWrapper<A::Lister, I>;
560 type Deleter = MetricsWrapper<A::Deleter, I>;
561
562 fn inner(&self) -> &Self::Inner {
563 &self.inner
564 }
565
566 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
567 let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
568
569 let start = Instant::now();
570
571 self.interceptor
572 .observe(labels.clone(), MetricValue::OperationExecuting(1));
573
574 let res = self
575 .inner()
576 .create_dir(path, args)
577 .await
578 .inspect(|_| {
579 self.interceptor.observe(
580 labels.clone(),
581 MetricValue::OperationDurationSeconds(start.elapsed()),
582 );
583 })
584 .inspect_err(|err| {
585 self.interceptor.observe(
586 labels.clone().with_error(err.kind()),
587 MetricValue::OperationErrorsTotal,
588 );
589 });
590
591 self.interceptor
592 .observe(labels, MetricValue::OperationExecuting(-1));
593 res
594 }
595
596 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
597 let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
598
599 let start = Instant::now();
600
601 self.interceptor
602 .observe(labels.clone(), MetricValue::OperationExecuting(1));
603
604 let (rp, reader) = self
605 .inner
606 .read(path, args)
607 .await
608 .inspect(|_| {
609 self.interceptor.observe(
610 labels.clone(),
611 MetricValue::OperationTtfbSeconds(start.elapsed()),
612 );
613 })
614 .inspect_err(|err| {
615 self.interceptor.observe(
616 labels.clone().with_error(err.kind()),
617 MetricValue::OperationErrorsTotal,
618 );
619 })?;
620
621 Ok((
622 rp,
623 MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
624 ))
625 }
626
627 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
628 let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
629
630 let start = Instant::now();
631
632 self.interceptor
633 .observe(labels.clone(), MetricValue::OperationExecuting(1));
634
635 let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
636 self.interceptor.observe(
637 labels.clone().with_error(err.kind()),
638 MetricValue::OperationErrorsTotal,
639 );
640 })?;
641
642 Ok((
643 rp,
644 MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
645 ))
646 }
647
648 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
649 let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
650
651 let start = Instant::now();
652
653 self.interceptor
654 .observe(labels.clone(), MetricValue::OperationExecuting(1));
655
656 let res = self
657 .inner()
658 .copy(from, to, args)
659 .await
660 .inspect(|_| {
661 self.interceptor.observe(
662 labels.clone(),
663 MetricValue::OperationDurationSeconds(start.elapsed()),
664 );
665 })
666 .inspect_err(|err| {
667 self.interceptor.observe(
668 labels.clone().with_error(err.kind()),
669 MetricValue::OperationErrorsTotal,
670 );
671 });
672
673 self.interceptor
674 .observe(labels, MetricValue::OperationExecuting(-1));
675 res
676 }
677
678 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
679 let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
680
681 let start = Instant::now();
682
683 self.interceptor
684 .observe(labels.clone(), MetricValue::OperationExecuting(1));
685
686 let res = self
687 .inner()
688 .rename(from, to, args)
689 .await
690 .inspect(|_| {
691 self.interceptor.observe(
692 labels.clone(),
693 MetricValue::OperationDurationSeconds(start.elapsed()),
694 );
695 })
696 .inspect_err(|err| {
697 self.interceptor.observe(
698 labels.clone().with_error(err.kind()),
699 MetricValue::OperationErrorsTotal,
700 );
701 });
702
703 self.interceptor
704 .observe(labels, MetricValue::OperationExecuting(-1));
705 res
706 }
707
708 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
709 let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
710
711 let start = Instant::now();
712
713 self.interceptor
714 .observe(labels.clone(), MetricValue::OperationExecuting(1));
715
716 let res = self
717 .inner()
718 .stat(path, args)
719 .await
720 .inspect(|_| {
721 self.interceptor.observe(
722 labels.clone(),
723 MetricValue::OperationDurationSeconds(start.elapsed()),
724 );
725 })
726 .inspect_err(|err| {
727 self.interceptor.observe(
728 labels.clone().with_error(err.kind()),
729 MetricValue::OperationErrorsTotal,
730 );
731 });
732
733 self.interceptor
734 .observe(labels, MetricValue::OperationExecuting(-1));
735 res
736 }
737
738 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
739 let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
740
741 let start = Instant::now();
742
743 self.interceptor
744 .observe(labels.clone(), MetricValue::OperationExecuting(1));
745
746 let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
747 self.interceptor.observe(
748 labels.clone().with_error(err.kind()),
749 MetricValue::OperationErrorsTotal,
750 );
751 })?;
752
753 Ok((
754 rp,
755 MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
756 ))
757 }
758
759 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
760 let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
761
762 let start = Instant::now();
763
764 self.interceptor
765 .observe(labels.clone(), MetricValue::OperationExecuting(1));
766
767 let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
768 self.interceptor.observe(
769 labels.clone().with_error(err.kind()),
770 MetricValue::OperationErrorsTotal,
771 );
772 })?;
773
774 Ok((
775 rp,
776 MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
777 ))
778 }
779
780 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
781 let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
782
783 let start = Instant::now();
784
785 self.interceptor
786 .observe(labels.clone(), MetricValue::OperationExecuting(1));
787
788 let res = self
789 .inner()
790 .presign(path, args)
791 .await
792 .inspect(|_| {
793 self.interceptor.observe(
794 labels.clone(),
795 MetricValue::OperationDurationSeconds(start.elapsed()),
796 );
797 })
798 .inspect_err(|err| {
799 self.interceptor.observe(
800 labels.clone().with_error(err.kind()),
801 MetricValue::OperationErrorsTotal,
802 );
803 });
804
805 self.interceptor
806 .observe(labels, MetricValue::OperationExecuting(-1));
807 res
808 }
809}
810
811pub struct MetricsWrapper<R, I: MetricsIntercept> {
812 inner: R,
813 interceptor: I,
814 labels: MetricLabels,
815
816 start: Instant,
817 size: u64,
818}
819
820impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
821 fn drop(&mut self) {
822 let size = self.size;
823 let duration = self.start.elapsed();
824
825 if self.labels.operation == Operation::Read.into_static()
826 || self.labels.operation == Operation::Write.into_static()
827 {
828 self.interceptor
829 .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
830 self.interceptor.observe(
831 self.labels.clone(),
832 MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
833 );
834 } else {
835 self.interceptor.observe(
836 self.labels.clone(),
837 MetricValue::OperationEntries(self.size),
838 );
839 self.interceptor.observe(
840 self.labels.clone(),
841 MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
842 );
843 }
844
845 self.interceptor.observe(
846 self.labels.clone(),
847 MetricValue::OperationDurationSeconds(duration),
848 );
849 self.interceptor
850 .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
851 }
852}
853
854impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
855 fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
856 Self {
857 inner,
858 interceptor,
859 labels,
860 start,
861 size: 0,
862 }
863 }
864}
865
866impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
867 async fn read(&mut self) -> Result<Buffer> {
868 self.inner
869 .read()
870 .await
871 .inspect(|bs| {
872 self.size += bs.len() as u64;
873 })
874 .inspect_err(|err| {
875 self.interceptor.observe(
876 self.labels.clone().with_error(err.kind()),
877 MetricValue::OperationErrorsTotal,
878 );
879 })
880 }
881}
882
883impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
884 async fn write(&mut self, bs: Buffer) -> Result<()> {
885 let size = bs.len();
886
887 self.inner
888 .write(bs)
889 .await
890 .inspect(|_| {
891 self.size += size as u64;
892 })
893 .inspect_err(|err| {
894 self.interceptor.observe(
895 self.labels.clone().with_error(err.kind()),
896 MetricValue::OperationErrorsTotal,
897 );
898 })
899 }
900
901 async fn close(&mut self) -> Result<Metadata> {
902 self.inner.close().await.inspect_err(|err| {
903 self.interceptor.observe(
904 self.labels.clone().with_error(err.kind()),
905 MetricValue::OperationErrorsTotal,
906 );
907 })
908 }
909
910 async fn abort(&mut self) -> Result<()> {
911 self.inner.abort().await.inspect_err(|err| {
912 self.interceptor.observe(
913 self.labels.clone().with_error(err.kind()),
914 MetricValue::OperationErrorsTotal,
915 );
916 })
917 }
918}
919
920impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
921 async fn next(&mut self) -> Result<Option<oio::Entry>> {
922 self.inner
923 .next()
924 .await
925 .inspect(|_| {
926 self.size += 1;
927 })
928 .inspect_err(|err| {
929 self.interceptor.observe(
930 self.labels.clone().with_error(err.kind()),
931 MetricValue::OperationErrorsTotal,
932 );
933 })
934 }
935}
936
937impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
938 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
939 self.inner
940 .delete(path, args)
941 .inspect(|_| {
942 self.size += 1;
943 })
944 .inspect_err(|err| {
945 self.interceptor.observe(
946 self.labels.clone().with_error(err.kind()),
947 MetricValue::OperationErrorsTotal,
948 );
949 })
950 }
951
952 async fn flush(&mut self) -> Result<usize> {
953 self.inner.flush().await.inspect_err(|err| {
954 self.interceptor.observe(
955 self.labels.clone().with_error(err.kind()),
956 MetricValue::OperationErrorsTotal,
957 );
958 })
959 }
960}