1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25
26use futures::Stream;
27use futures::StreamExt;
28use http::StatusCode;
29
30use crate::raw::*;
31use crate::*;
32
33const KIB: f64 = 1024.0;
34const MIB: f64 = 1024.0 * KIB;
35const GIB: f64 = 1024.0 * MIB;
36
37pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
40 4.0 * KIB, 64.0 * KIB, 256.0 * KIB, 1.0 * MIB, 4.0 * MIB, 16.0 * MIB, 64.0 * MIB, 256.0 * MIB, 1.0 * GIB, 5.0 * GIB, ];
51
52pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
58 8.0 * KIB, 32.0 * KIB, 128.0 * KIB, 1.0 * MIB, 8.0 * MIB, 32.0 * MIB, 128.0 * MIB, 512.0 * MIB, 2.0 * GIB, 8.0 * GIB, 32.0 * GIB, ];
74
75pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
78 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
88
89pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
92 1.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
101
102pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
105 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, ];
118
119pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
122 0.001, 0.01, 0.025, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, ];
133
134pub static LABEL_SCHEME: &str = "scheme";
136pub static LABEL_NAMESPACE: &str = "namespace";
138pub static LABEL_ROOT: &str = "root";
140pub static LABEL_OPERATION: &str = "operation";
142pub static LABEL_ERROR: &str = "error";
144pub static LABEL_STATUS_CODE: &str = "status_code";
146
147#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
149pub struct MetricLabels {
150 pub scheme: &'static str,
153 pub namespace: Arc<str>,
156 pub root: Arc<str>,
159 pub operation: &'static str,
162 pub error: Option<ErrorKind>,
166 pub status_code: Option<StatusCode>,
170}
171
172impl MetricLabels {
173 fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
175 MetricLabels {
176 scheme: info.scheme(),
177 namespace: info.name(),
178 root: info.root(),
179 operation: op,
180 ..MetricLabels::default()
181 }
182 }
183
184 fn with_error(mut self, err: ErrorKind) -> Self {
186 self.error = Some(err);
187 self
188 }
189
190 fn with_status_code(mut self, code: StatusCode) -> Self {
192 self.status_code = Some(code);
193 self
194 }
195}
196
197#[non_exhaustive]
204#[derive(Debug, Clone, Copy)]
205pub enum MetricValue {
206 OperationBytes(u64),
209 OperationBytesRate(f64),
212 OperationEntries(u64),
215 OperationEntriesRate(f64),
218 OperationDurationSeconds(Duration),
221 OperationErrorsTotal,
224 OperationExecuting(isize),
227 OperationTtfbSeconds(Duration),
230 HttpExecuting(isize),
233 HttpRequestBytes(u64),
236 HttpRequestBytesRate(f64),
239 HttpRequestDurationSeconds(Duration),
242 HttpResponseBytes(u64),
245 HttpResponseBytesRate(f64),
248 HttpResponseDurationSeconds(Duration),
251 HttpConnectionErrorsTotal,
254 HttpStatusErrorsTotal,
257}
258
259impl MetricValue {
260 pub fn name(&self) -> &'static str {
262 match self {
263 MetricValue::OperationBytes(_) => "opendal_operation_bytes",
264 MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
265 MetricValue::OperationEntries(_) => "opendal_operation_entries",
266 MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
267 MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
268 MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
269 MetricValue::OperationExecuting(_) => "opendal_operation_executing",
270 MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
271
272 MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
273 MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
274 MetricValue::HttpExecuting(_) => "opendal_http_executing",
275 MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
276 MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
277 MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
278 MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
279 MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
280 MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
281 }
282 }
283
284 pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
290 match self {
291 MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
292 MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
293 MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
294 MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
295 MetricValue::OperationDurationSeconds(_) => {
296 ("opendal_operation_duration", Some("seconds"))
297 }
298 MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
299 MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
300 MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
301
302 MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
303 MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
304 MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
305 MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
306 MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
307 MetricValue::HttpRequestDurationSeconds(_) => {
308 ("opendal_http_request_duration", Some("seconds"))
309 }
310 MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
311 MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
312 MetricValue::HttpResponseDurationSeconds(_) => {
313 ("opendal_http_response_duration", Some("seconds"))
314 }
315 }
316 }
317
318 pub fn help(&self) -> &'static str {
320 match self {
321 MetricValue::OperationBytes(_) => {
322 "Current operation size in bytes, represents the size of data being processed in the current operation"
323 }
324 MetricValue::OperationBytesRate(_) => {
325 "Histogram of data processing rates in bytes per second within individual operations"
326 }
327 MetricValue::OperationEntries(_) => {
328 "Current operation size in entries, represents the entries being processed in the current operation"
329 }
330 MetricValue::OperationEntriesRate(_) => {
331 "Histogram of entries processing rates in entries per second within individual operations"
332 }
333 MetricValue::OperationDurationSeconds(_) => {
334 "Duration of operations in seconds, measured from start to completion"
335 }
336 MetricValue::OperationErrorsTotal => "Total number of failed operations",
337 MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
338 MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
339
340 MetricValue::HttpConnectionErrorsTotal => {
341 "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)"
342 }
343 MetricValue::HttpStatusErrorsTotal => {
344 "Total number of HTTP requests that received error status codes (non-2xx responses)"
345 }
346 MetricValue::HttpExecuting(_) => {
347 "Number of HTTP requests currently in flight from this client"
348 }
349 MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
350 MetricValue::HttpRequestBytesRate(_) => {
351 "Histogram of HTTP request bytes per second rates"
352 }
353 MetricValue::HttpRequestDurationSeconds(_) => {
354 "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte"
355 }
356 MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
357 MetricValue::HttpResponseBytesRate(_) => {
358 "Histogram of HTTP response bytes per second rates"
359 }
360 MetricValue::HttpResponseDurationSeconds(_) => {
361 "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received"
362 }
363 }
364 }
365}
366
367pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
371 fn observe(&self, labels: MetricLabels, value: MetricValue) {
373 let _ = (labels, value);
374 }
375}
376
377#[derive(Clone, Debug)]
379pub struct MetricsLayer<I: MetricsIntercept> {
380 interceptor: I,
381}
382
383impl<I: MetricsIntercept> MetricsLayer<I> {
384 pub fn new(interceptor: I) -> Self {
386 Self { interceptor }
387 }
388}
389
390impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
391 type LayeredAccess = MetricsAccessor<A, I>;
392
393 fn layer(&self, inner: A) -> Self::LayeredAccess {
394 let info = inner.info();
395
396 info.update_http_client(|client| {
398 HttpClient::with(MetricsHttpFetcher {
399 inner: client.into_inner(),
400 info: info.clone(),
401 interceptor: self.interceptor.clone(),
402 })
403 });
404
405 MetricsAccessor {
406 inner,
407 info,
408 interceptor: self.interceptor.clone(),
409 }
410 }
411}
412
413pub struct MetricsHttpFetcher<I: MetricsIntercept> {
415 inner: HttpFetcher,
416 info: Arc<AccessorInfo>,
417 interceptor: I,
418}
419
420impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
421 async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
422 let labels = MetricLabels::new(
423 self.info.clone(),
424 req.extensions()
425 .get::<Operation>()
426 .copied()
427 .map(Operation::into_static)
428 .unwrap_or("unknown"),
429 );
430
431 let start = Instant::now();
432 let req_size = req.body().len();
433
434 self.interceptor
435 .observe(labels.clone(), MetricValue::HttpExecuting(1));
436
437 let res = self.inner.fetch(req).await;
438 let req_duration = start.elapsed();
439
440 match res {
441 Err(err) => {
442 self.interceptor
443 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
444 self.interceptor
445 .observe(labels, MetricValue::HttpConnectionErrorsTotal);
446 Err(err)
447 }
448 Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
449 self.interceptor
450 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
451 self.interceptor.observe(
452 labels.with_status_code(resp.status()),
453 MetricValue::HttpStatusErrorsTotal,
454 );
455 Ok(resp)
456 }
457 Ok(resp) => {
458 self.interceptor.observe(
459 labels.clone(),
460 MetricValue::HttpRequestBytes(req_size as u64),
461 );
462 self.interceptor.observe(
463 labels.clone(),
464 MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
465 );
466 self.interceptor.observe(
467 labels.clone(),
468 MetricValue::HttpRequestDurationSeconds(req_duration),
469 );
470
471 let (parts, body) = resp.into_parts();
472 let body = body.map_inner(|s| {
473 Box::new(MetricsStream {
474 inner: s,
475 interceptor: self.interceptor.clone(),
476 labels: labels.clone(),
477 size: 0,
478 start: Instant::now(),
479 })
480 });
481
482 Ok(http::Response::from_parts(parts, body))
483 }
484 }
485 }
486}
487
488pub struct MetricsStream<S, I> {
489 inner: S,
490 interceptor: I,
491
492 labels: MetricLabels,
493 size: u64,
494 start: Instant,
495}
496
497impl<S, I> Stream for MetricsStream<S, I>
498where
499 S: Stream<Item = Result<Buffer>> + Unpin + 'static,
500 I: MetricsIntercept,
501{
502 type Item = Result<Buffer>;
503
504 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
505 match ready!(self.inner.poll_next_unpin(cx)) {
506 Some(Ok(bs)) => {
507 self.size += bs.len() as u64;
508 Poll::Ready(Some(Ok(bs)))
509 }
510 Some(Err(err)) => Poll::Ready(Some(Err(err))),
511 None => {
512 let resp_size = self.size;
513 let resp_duration = self.start.elapsed();
514
515 self.interceptor.observe(
516 self.labels.clone(),
517 MetricValue::HttpResponseBytes(resp_size),
518 );
519 self.interceptor.observe(
520 self.labels.clone(),
521 MetricValue::HttpResponseBytesRate(
522 resp_size as f64 / resp_duration.as_secs_f64(),
523 ),
524 );
525 self.interceptor.observe(
526 self.labels.clone(),
527 MetricValue::HttpResponseDurationSeconds(resp_duration),
528 );
529 self.interceptor
530 .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
531
532 Poll::Ready(None)
533 }
534 }
535 }
536}
537
538pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
540 inner: A,
541 info: Arc<AccessorInfo>,
542 interceptor: I,
543}
544
545impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
546 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
547 f.debug_struct("MetricsAccessor")
548 .field("inner", &self.inner)
549 .finish_non_exhaustive()
550 }
551}
552
553impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
554 type Inner = A;
555 type Reader = MetricsWrapper<A::Reader, I>;
556 type Writer = MetricsWrapper<A::Writer, I>;
557 type Lister = MetricsWrapper<A::Lister, I>;
558 type Deleter = MetricsWrapper<A::Deleter, I>;
559
560 fn inner(&self) -> &Self::Inner {
561 &self.inner
562 }
563
564 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
565 let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
566
567 let start = Instant::now();
568
569 self.interceptor
570 .observe(labels.clone(), MetricValue::OperationExecuting(1));
571
572 let res = self
573 .inner()
574 .create_dir(path, args)
575 .await
576 .inspect(|_| {
577 self.interceptor.observe(
578 labels.clone(),
579 MetricValue::OperationDurationSeconds(start.elapsed()),
580 );
581 })
582 .inspect_err(|err| {
583 self.interceptor.observe(
584 labels.clone().with_error(err.kind()),
585 MetricValue::OperationErrorsTotal,
586 );
587 });
588
589 self.interceptor
590 .observe(labels, MetricValue::OperationExecuting(-1));
591 res
592 }
593
594 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
595 let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
596
597 let start = Instant::now();
598
599 self.interceptor
600 .observe(labels.clone(), MetricValue::OperationExecuting(1));
601
602 let (rp, reader) = self
603 .inner
604 .read(path, args)
605 .await
606 .inspect(|_| {
607 self.interceptor.observe(
608 labels.clone(),
609 MetricValue::OperationTtfbSeconds(start.elapsed()),
610 );
611 })
612 .inspect_err(|err| {
613 self.interceptor.observe(
614 labels.clone().with_error(err.kind()),
615 MetricValue::OperationErrorsTotal,
616 );
617 })?;
618
619 Ok((
620 rp,
621 MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
622 ))
623 }
624
625 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
626 let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
627
628 let start = Instant::now();
629
630 self.interceptor
631 .observe(labels.clone(), MetricValue::OperationExecuting(1));
632
633 let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
634 self.interceptor.observe(
635 labels.clone().with_error(err.kind()),
636 MetricValue::OperationErrorsTotal,
637 );
638 })?;
639
640 Ok((
641 rp,
642 MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
643 ))
644 }
645
646 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
647 let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
648
649 let start = Instant::now();
650
651 self.interceptor
652 .observe(labels.clone(), MetricValue::OperationExecuting(1));
653
654 let res = self
655 .inner()
656 .copy(from, to, args)
657 .await
658 .inspect(|_| {
659 self.interceptor.observe(
660 labels.clone(),
661 MetricValue::OperationDurationSeconds(start.elapsed()),
662 );
663 })
664 .inspect_err(|err| {
665 self.interceptor.observe(
666 labels.clone().with_error(err.kind()),
667 MetricValue::OperationErrorsTotal,
668 );
669 });
670
671 self.interceptor
672 .observe(labels, MetricValue::OperationExecuting(-1));
673 res
674 }
675
676 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
677 let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
678
679 let start = Instant::now();
680
681 self.interceptor
682 .observe(labels.clone(), MetricValue::OperationExecuting(1));
683
684 let res = self
685 .inner()
686 .rename(from, to, args)
687 .await
688 .inspect(|_| {
689 self.interceptor.observe(
690 labels.clone(),
691 MetricValue::OperationDurationSeconds(start.elapsed()),
692 );
693 })
694 .inspect_err(|err| {
695 self.interceptor.observe(
696 labels.clone().with_error(err.kind()),
697 MetricValue::OperationErrorsTotal,
698 );
699 });
700
701 self.interceptor
702 .observe(labels, MetricValue::OperationExecuting(-1));
703 res
704 }
705
706 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
707 let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
708
709 let start = Instant::now();
710
711 self.interceptor
712 .observe(labels.clone(), MetricValue::OperationExecuting(1));
713
714 let res = self
715 .inner()
716 .stat(path, args)
717 .await
718 .inspect(|_| {
719 self.interceptor.observe(
720 labels.clone(),
721 MetricValue::OperationDurationSeconds(start.elapsed()),
722 );
723 })
724 .inspect_err(|err| {
725 self.interceptor.observe(
726 labels.clone().with_error(err.kind()),
727 MetricValue::OperationErrorsTotal,
728 );
729 });
730
731 self.interceptor
732 .observe(labels, MetricValue::OperationExecuting(-1));
733 res
734 }
735
736 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
737 let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
738
739 let start = Instant::now();
740
741 self.interceptor
742 .observe(labels.clone(), MetricValue::OperationExecuting(1));
743
744 let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
745 self.interceptor.observe(
746 labels.clone().with_error(err.kind()),
747 MetricValue::OperationErrorsTotal,
748 );
749 })?;
750
751 Ok((
752 rp,
753 MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
754 ))
755 }
756
757 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
758 let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
759
760 let start = Instant::now();
761
762 self.interceptor
763 .observe(labels.clone(), MetricValue::OperationExecuting(1));
764
765 let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
766 self.interceptor.observe(
767 labels.clone().with_error(err.kind()),
768 MetricValue::OperationErrorsTotal,
769 );
770 })?;
771
772 Ok((
773 rp,
774 MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
775 ))
776 }
777
778 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
779 let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
780
781 let start = Instant::now();
782
783 self.interceptor
784 .observe(labels.clone(), MetricValue::OperationExecuting(1));
785
786 let res = self
787 .inner()
788 .presign(path, args)
789 .await
790 .inspect(|_| {
791 self.interceptor.observe(
792 labels.clone(),
793 MetricValue::OperationDurationSeconds(start.elapsed()),
794 );
795 })
796 .inspect_err(|err| {
797 self.interceptor.observe(
798 labels.clone().with_error(err.kind()),
799 MetricValue::OperationErrorsTotal,
800 );
801 });
802
803 self.interceptor
804 .observe(labels, MetricValue::OperationExecuting(-1));
805 res
806 }
807}
808
809pub struct MetricsWrapper<R, I: MetricsIntercept> {
810 inner: R,
811 interceptor: I,
812 labels: MetricLabels,
813
814 start: Instant,
815 size: u64,
816}
817
818impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
819 fn drop(&mut self) {
820 let size = self.size;
821 let duration = self.start.elapsed();
822
823 if self.labels.operation == Operation::Read.into_static()
824 || self.labels.operation == Operation::Write.into_static()
825 {
826 self.interceptor
827 .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
828 self.interceptor.observe(
829 self.labels.clone(),
830 MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
831 );
832 } else {
833 self.interceptor.observe(
834 self.labels.clone(),
835 MetricValue::OperationEntries(self.size),
836 );
837 self.interceptor.observe(
838 self.labels.clone(),
839 MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
840 );
841 }
842
843 self.interceptor.observe(
844 self.labels.clone(),
845 MetricValue::OperationDurationSeconds(duration),
846 );
847 self.interceptor
848 .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
849 }
850}
851
852impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
853 fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
854 Self {
855 inner,
856 interceptor,
857 labels,
858 start,
859 size: 0,
860 }
861 }
862}
863
864impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
865 async fn read(&mut self) -> Result<Buffer> {
866 self.inner
867 .read()
868 .await
869 .inspect(|bs| {
870 self.size += bs.len() as u64;
871 })
872 .inspect_err(|err| {
873 self.interceptor.observe(
874 self.labels.clone().with_error(err.kind()),
875 MetricValue::OperationErrorsTotal,
876 );
877 })
878 }
879}
880
881impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
882 async fn write(&mut self, bs: Buffer) -> Result<()> {
883 let size = bs.len();
884
885 self.inner
886 .write(bs)
887 .await
888 .inspect(|_| {
889 self.size += size as u64;
890 })
891 .inspect_err(|err| {
892 self.interceptor.observe(
893 self.labels.clone().with_error(err.kind()),
894 MetricValue::OperationErrorsTotal,
895 );
896 })
897 }
898
899 async fn close(&mut self) -> Result<Metadata> {
900 self.inner.close().await.inspect_err(|err| {
901 self.interceptor.observe(
902 self.labels.clone().with_error(err.kind()),
903 MetricValue::OperationErrorsTotal,
904 );
905 })
906 }
907
908 async fn abort(&mut self) -> Result<()> {
909 self.inner.abort().await.inspect_err(|err| {
910 self.interceptor.observe(
911 self.labels.clone().with_error(err.kind()),
912 MetricValue::OperationErrorsTotal,
913 );
914 })
915 }
916}
917
918impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
919 async fn next(&mut self) -> Result<Option<oio::Entry>> {
920 self.inner
921 .next()
922 .await
923 .inspect(|_| {
924 self.size += 1;
925 })
926 .inspect_err(|err| {
927 self.interceptor.observe(
928 self.labels.clone().with_error(err.kind()),
929 MetricValue::OperationErrorsTotal,
930 );
931 })
932 }
933}
934
935impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
936 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
937 self.inner
938 .delete(path, args)
939 .await
940 .inspect(|_| {
941 self.size += 1;
942 })
943 .inspect_err(|err| {
944 self.interceptor.observe(
945 self.labels.clone().with_error(err.kind()),
946 MetricValue::OperationErrorsTotal,
947 );
948 })
949 }
950
951 async fn close(&mut self) -> Result<()> {
952 self.inner.close().await.inspect_err(|err| {
953 self.interceptor.observe(
954 self.labels.clone().with_error(err.kind()),
955 MetricValue::OperationErrorsTotal,
956 );
957 })
958 }
959}