1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::ready;
23use std::task::Context;
24use std::task::Poll;
25use std::time::Duration;
26use std::time::Instant;
27
28use futures::Stream;
29use futures::StreamExt;
30use http::StatusCode;
31
32use crate::raw::*;
33use crate::*;
34
35const KIB: f64 = 1024.0;
36const MIB: f64 = 1024.0 * KIB;
37const GIB: f64 = 1024.0 * MIB;
38
39pub const DEFAULT_BYTES_BUCKETS: &[f64] = &[
42 4.0 * KIB, 64.0 * KIB, 256.0 * KIB, 1.0 * MIB, 4.0 * MIB, 16.0 * MIB, 64.0 * MIB, 256.0 * MIB, 1.0 * GIB, 5.0 * GIB, ];
53
54pub const DEFAULT_BYTES_RATE_BUCKETS: &[f64] = &[
60 8.0 * KIB, 32.0 * KIB, 128.0 * KIB, 1.0 * MIB, 8.0 * MIB, 32.0 * MIB, 128.0 * MIB, 512.0 * MIB, 2.0 * GIB, 8.0 * GIB, 32.0 * GIB, ];
76
77pub const DEFAULT_ENTRIES_BUCKETS: &[f64] = &[
80 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
90
91pub const DEFAULT_ENTRIES_RATE_BUCKETS: &[f64] = &[
94 1.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, ];
103
104pub const DEFAULT_DURATION_SECONDS_BUCKETS: &[f64] = &[
107 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, ];
120
121pub const DEFAULT_TTFB_BUCKETS: &[f64] = &[
124 0.001, 0.01, 0.025, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, ];
135
136pub static LABEL_SCHEME: &str = "scheme";
138pub static LABEL_NAMESPACE: &str = "namespace";
140pub static LABEL_ROOT: &str = "root";
142pub static LABEL_OPERATION: &str = "operation";
144pub static LABEL_ERROR: &str = "error";
146pub static LABEL_STATUS_CODE: &str = "status_code";
148
149#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
151pub struct MetricLabels {
152 pub scheme: Scheme,
155 pub namespace: Arc<str>,
158 pub root: Arc<str>,
161 pub operation: &'static str,
164 pub error: Option<ErrorKind>,
168 pub status_code: Option<StatusCode>,
172}
173
174impl MetricLabels {
175 fn new(info: Arc<AccessorInfo>, op: &'static str) -> Self {
177 MetricLabels {
178 scheme: info.scheme(),
179 namespace: info.name(),
180 root: info.root(),
181 operation: op,
182 ..MetricLabels::default()
183 }
184 }
185
186 fn with_error(mut self, err: ErrorKind) -> Self {
188 self.error = Some(err);
189 self
190 }
191
192 fn with_status_code(mut self, code: StatusCode) -> Self {
194 self.status_code = Some(code);
195 self
196 }
197}
198
199#[non_exhaustive]
206#[derive(Debug, Clone, Copy)]
207pub enum MetricValue {
208 OperationBytes(u64),
211 OperationBytesRate(f64),
214 OperationEntries(u64),
217 OperationEntriesRate(f64),
220 OperationDurationSeconds(Duration),
223 OperationErrorsTotal,
226 OperationExecuting(isize),
229 OperationTtfbSeconds(Duration),
232 HttpExecuting(isize),
235 HttpRequestBytes(u64),
238 HttpRequestBytesRate(f64),
241 HttpRequestDurationSeconds(Duration),
244 HttpResponseBytes(u64),
247 HttpResponseBytesRate(f64),
250 HttpResponseDurationSeconds(Duration),
253 HttpConnectionErrorsTotal,
256 HttpStatusErrorsTotal,
259}
260
261impl MetricValue {
262 pub fn name(&self) -> &'static str {
264 match self {
265 MetricValue::OperationBytes(_) => "opendal_operation_bytes",
266 MetricValue::OperationBytesRate(_) => "opendal_operation_bytes_rate",
267 MetricValue::OperationEntries(_) => "opendal_operation_entries",
268 MetricValue::OperationEntriesRate(_) => "opendal_operation_entries_rate",
269 MetricValue::OperationDurationSeconds(_) => "opendal_operation_duration_seconds",
270 MetricValue::OperationErrorsTotal => "opendal_operation_errors_total",
271 MetricValue::OperationExecuting(_) => "opendal_operation_executing",
272 MetricValue::OperationTtfbSeconds(_) => "opendal_operation_ttfb_seconds",
273
274 MetricValue::HttpConnectionErrorsTotal => "opendal_http_connection_errors_total",
275 MetricValue::HttpStatusErrorsTotal => "opendal_http_status_errors_total",
276 MetricValue::HttpExecuting(_) => "opendal_http_executing",
277 MetricValue::HttpRequestBytes(_) => "opendal_http_request_bytes",
278 MetricValue::HttpRequestBytesRate(_) => "opendal_http_request_bytes_rate",
279 MetricValue::HttpRequestDurationSeconds(_) => "opendal_http_request_duration_seconds",
280 MetricValue::HttpResponseBytes(_) => "opendal_http_response_bytes",
281 MetricValue::HttpResponseBytesRate(_) => "opendal_http_response_bytes_rate",
282 MetricValue::HttpResponseDurationSeconds(_) => "opendal_http_response_duration_seconds",
283 }
284 }
285
286 pub fn name_with_unit(&self) -> (&'static str, Option<&'static str>) {
292 match self {
293 MetricValue::OperationBytes(_) => ("opendal_operation", Some("bytes")),
294 MetricValue::OperationBytesRate(_) => ("opendal_operation_bytes_rate", None),
295 MetricValue::OperationEntries(_) => ("opendal_operation_entries", None),
296 MetricValue::OperationEntriesRate(_) => ("opendal_operation_entries_rate", None),
297 MetricValue::OperationDurationSeconds(_) => {
298 ("opendal_operation_duration", Some("seconds"))
299 }
300 MetricValue::OperationErrorsTotal => ("opendal_operation_errors", None),
301 MetricValue::OperationExecuting(_) => ("opendal_operation_executing", None),
302 MetricValue::OperationTtfbSeconds(_) => ("opendal_operation_ttfb", Some("seconds")),
303
304 MetricValue::HttpConnectionErrorsTotal => ("opendal_http_connection_errors", None),
305 MetricValue::HttpStatusErrorsTotal => ("opendal_http_status_errors", None),
306 MetricValue::HttpExecuting(_) => ("opendal_http_executing", None),
307 MetricValue::HttpRequestBytes(_) => ("opendal_http_request", Some("bytes")),
308 MetricValue::HttpRequestBytesRate(_) => ("opendal_http_request_bytes_rate", None),
309 MetricValue::HttpRequestDurationSeconds(_) => {
310 ("opendal_http_request_duration", Some("seconds"))
311 }
312 MetricValue::HttpResponseBytes(_) => ("opendal_http_response", Some("bytes")),
313 MetricValue::HttpResponseBytesRate(_) => ("opendal_http_response_bytes_rate", None),
314 MetricValue::HttpResponseDurationSeconds(_) => {
315 ("opendal_http_response_duration", Some("seconds"))
316 }
317 }
318 }
319
320 pub fn help(&self) -> &'static str {
322 match self {
323 MetricValue::OperationBytes(_) => "Current operation size in bytes, represents the size of data being processed in the current operation",
324 MetricValue::OperationBytesRate(_) => "Histogram of data processing rates in bytes per second within individual operations",
325 MetricValue::OperationEntries(_) => "Current operation size in entries, represents the entries being processed in the current operation",
326 MetricValue::OperationEntriesRate(_) => "Histogram of entries processing rates in entries per second within individual operations",
327 MetricValue::OperationDurationSeconds(_) => "Duration of operations in seconds, measured from start to completion",
328 MetricValue::OperationErrorsTotal => "Total number of failed operations",
329 MetricValue::OperationExecuting(_) => "Number of operations currently being executed",
330 MetricValue::OperationTtfbSeconds(_) => "Time to first byte in seconds for operations",
331
332 MetricValue::HttpConnectionErrorsTotal => "Total number of HTTP requests that failed before receiving a response (DNS failures, connection refused, timeouts, TLS errors)",
333 MetricValue::HttpStatusErrorsTotal => "Total number of HTTP requests that received error status codes (non-2xx responses)",
334 MetricValue::HttpExecuting(_) => "Number of HTTP requests currently in flight from this client",
335 MetricValue::HttpRequestBytes(_) => "Histogram of HTTP request body sizes in bytes",
336 MetricValue::HttpRequestBytesRate(_) => "Histogram of HTTP request bytes per second rates",
337 MetricValue::HttpRequestDurationSeconds(_) => "Histogram of time durations in seconds spent sending HTTP requests, from first byte sent to receiving the first byte",
338 MetricValue::HttpResponseBytes(_) => "Histogram of HTTP response body sizes in bytes",
339 MetricValue::HttpResponseBytesRate(_) => "Histogram of HTTP response bytes per second rates",
340 MetricValue::HttpResponseDurationSeconds(_) => "Histogram of time durations in seconds spent receiving HTTP responses, from first byte received to last byte received",
341 }
342 }
343}
344
345pub trait MetricsIntercept: Debug + Clone + Send + Sync + Unpin + 'static {
349 fn observe(&self, labels: MetricLabels, value: MetricValue) {
351 let _ = (labels, value);
352 }
353}
354
355#[derive(Clone, Debug)]
357pub struct MetricsLayer<I: MetricsIntercept> {
358 interceptor: I,
359}
360
361impl<I: MetricsIntercept> MetricsLayer<I> {
362 pub fn new(interceptor: I) -> Self {
364 Self { interceptor }
365 }
366}
367
368impl<A: Access, I: MetricsIntercept> Layer<A> for MetricsLayer<I> {
369 type LayeredAccess = MetricsAccessor<A, I>;
370
371 fn layer(&self, inner: A) -> Self::LayeredAccess {
372 let info = inner.info();
373
374 info.update_http_client(|client| {
376 HttpClient::with(MetricsHttpFetcher {
377 inner: client.into_inner(),
378 info: info.clone(),
379 interceptor: self.interceptor.clone(),
380 })
381 });
382
383 MetricsAccessor {
384 inner,
385 info,
386 interceptor: self.interceptor.clone(),
387 }
388 }
389}
390
391pub struct MetricsHttpFetcher<I: MetricsIntercept> {
393 inner: HttpFetcher,
394 info: Arc<AccessorInfo>,
395 interceptor: I,
396}
397
398impl<I: MetricsIntercept> HttpFetch for MetricsHttpFetcher<I> {
399 async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
400 let labels = MetricLabels::new(
401 self.info.clone(),
402 req.extensions()
403 .get::<Operation>()
404 .copied()
405 .map(Operation::into_static)
406 .unwrap_or("unknown"),
407 );
408
409 let start = Instant::now();
410 let req_size = req.body().len();
411
412 self.interceptor
413 .observe(labels.clone(), MetricValue::HttpExecuting(1));
414
415 let res = self.inner.fetch(req).await;
416 let req_duration = start.elapsed();
417
418 match res {
419 Err(err) => {
420 self.interceptor
421 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
422 self.interceptor
423 .observe(labels, MetricValue::HttpConnectionErrorsTotal);
424 Err(err)
425 }
426 Ok(resp) if resp.status().is_client_error() && resp.status().is_server_error() => {
427 self.interceptor
428 .observe(labels.clone(), MetricValue::HttpExecuting(-1));
429 self.interceptor.observe(
430 labels.with_status_code(resp.status()),
431 MetricValue::HttpStatusErrorsTotal,
432 );
433 Ok(resp)
434 }
435 Ok(resp) => {
436 self.interceptor.observe(
437 labels.clone(),
438 MetricValue::HttpRequestBytes(req_size as u64),
439 );
440 self.interceptor.observe(
441 labels.clone(),
442 MetricValue::HttpRequestBytesRate(req_size as f64 / req_duration.as_secs_f64()),
443 );
444 self.interceptor.observe(
445 labels.clone(),
446 MetricValue::HttpRequestDurationSeconds(req_duration),
447 );
448
449 let (parts, body) = resp.into_parts();
450 let body = body.map_inner(|s| {
451 Box::new(MetricsStream {
452 inner: s,
453 interceptor: self.interceptor.clone(),
454 labels: labels.clone(),
455 size: 0,
456 start: Instant::now(),
457 })
458 });
459
460 Ok(http::Response::from_parts(parts, body))
461 }
462 }
463 }
464}
465
466pub struct MetricsStream<S, I> {
467 inner: S,
468 interceptor: I,
469
470 labels: MetricLabels,
471 size: u64,
472 start: Instant,
473}
474
475impl<S, I> Stream for MetricsStream<S, I>
476where
477 S: Stream<Item = Result<Buffer>> + Unpin + 'static,
478 I: MetricsIntercept,
479{
480 type Item = Result<Buffer>;
481
482 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
483 match ready!(self.inner.poll_next_unpin(cx)) {
484 Some(Ok(bs)) => {
485 self.size += bs.len() as u64;
486 Poll::Ready(Some(Ok(bs)))
487 }
488 Some(Err(err)) => Poll::Ready(Some(Err(err))),
489 None => {
490 let resp_size = self.size;
491 let resp_duration = self.start.elapsed();
492
493 self.interceptor.observe(
494 self.labels.clone(),
495 MetricValue::HttpResponseBytes(resp_size),
496 );
497 self.interceptor.observe(
498 self.labels.clone(),
499 MetricValue::HttpResponseBytesRate(
500 resp_size as f64 / resp_duration.as_secs_f64(),
501 ),
502 );
503 self.interceptor.observe(
504 self.labels.clone(),
505 MetricValue::HttpResponseDurationSeconds(resp_duration),
506 );
507 self.interceptor
508 .observe(self.labels.clone(), MetricValue::HttpExecuting(-1));
509
510 Poll::Ready(None)
511 }
512 }
513 }
514}
515
516pub struct MetricsAccessor<A: Access, I: MetricsIntercept> {
518 inner: A,
519 info: Arc<AccessorInfo>,
520 interceptor: I,
521}
522
523impl<A: Access, I: MetricsIntercept> Debug for MetricsAccessor<A, I> {
524 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
525 f.debug_struct("MetricsAccessor")
526 .field("inner", &self.inner)
527 .finish_non_exhaustive()
528 }
529}
530
531impl<A: Access, I: MetricsIntercept> LayeredAccess for MetricsAccessor<A, I> {
532 type Inner = A;
533 type Reader = MetricsWrapper<A::Reader, I>;
534 type Writer = MetricsWrapper<A::Writer, I>;
535 type Lister = MetricsWrapper<A::Lister, I>;
536 type Deleter = MetricsWrapper<A::Deleter, I>;
537
538 fn inner(&self) -> &Self::Inner {
539 &self.inner
540 }
541
542 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
543 let labels = MetricLabels::new(self.info.clone(), Operation::CreateDir.into_static());
544
545 let start = Instant::now();
546
547 self.interceptor
548 .observe(labels.clone(), MetricValue::OperationExecuting(1));
549
550 let res = self
551 .inner()
552 .create_dir(path, args)
553 .await
554 .inspect(|_| {
555 self.interceptor.observe(
556 labels.clone(),
557 MetricValue::OperationDurationSeconds(start.elapsed()),
558 );
559 })
560 .inspect_err(|err| {
561 self.interceptor.observe(
562 labels.clone().with_error(err.kind()),
563 MetricValue::OperationErrorsTotal,
564 );
565 });
566
567 self.interceptor
568 .observe(labels, MetricValue::OperationExecuting(-1));
569 res
570 }
571
572 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
573 let labels = MetricLabels::new(self.info.clone(), Operation::Read.into_static());
574
575 let start = Instant::now();
576
577 self.interceptor
578 .observe(labels.clone(), MetricValue::OperationExecuting(1));
579
580 let (rp, reader) = self
581 .inner
582 .read(path, args)
583 .await
584 .inspect(|_| {
585 self.interceptor.observe(
586 labels.clone(),
587 MetricValue::OperationTtfbSeconds(start.elapsed()),
588 );
589 })
590 .inspect_err(|err| {
591 self.interceptor.observe(
592 labels.clone().with_error(err.kind()),
593 MetricValue::OperationErrorsTotal,
594 );
595 })?;
596
597 Ok((
598 rp,
599 MetricsWrapper::new(reader, self.interceptor.clone(), labels, start),
600 ))
601 }
602
603 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
604 let labels = MetricLabels::new(self.info.clone(), Operation::Write.into_static());
605
606 let start = Instant::now();
607
608 self.interceptor
609 .observe(labels.clone(), MetricValue::OperationExecuting(1));
610
611 let (rp, writer) = self.inner.write(path, args).await.inspect_err(|err| {
612 self.interceptor.observe(
613 labels.clone().with_error(err.kind()),
614 MetricValue::OperationErrorsTotal,
615 );
616 })?;
617
618 Ok((
619 rp,
620 MetricsWrapper::new(writer, self.interceptor.clone(), labels, start),
621 ))
622 }
623
624 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
625 let labels = MetricLabels::new(self.info.clone(), Operation::Copy.into_static());
626
627 let start = Instant::now();
628
629 self.interceptor
630 .observe(labels.clone(), MetricValue::OperationExecuting(1));
631
632 let res = self
633 .inner()
634 .copy(from, to, args)
635 .await
636 .inspect(|_| {
637 self.interceptor.observe(
638 labels.clone(),
639 MetricValue::OperationDurationSeconds(start.elapsed()),
640 );
641 })
642 .inspect_err(|err| {
643 self.interceptor.observe(
644 labels.clone().with_error(err.kind()),
645 MetricValue::OperationErrorsTotal,
646 );
647 });
648
649 self.interceptor
650 .observe(labels, MetricValue::OperationExecuting(-1));
651 res
652 }
653
654 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
655 let labels = MetricLabels::new(self.info.clone(), Operation::Rename.into_static());
656
657 let start = Instant::now();
658
659 self.interceptor
660 .observe(labels.clone(), MetricValue::OperationExecuting(1));
661
662 let res = self
663 .inner()
664 .rename(from, to, args)
665 .await
666 .inspect(|_| {
667 self.interceptor.observe(
668 labels.clone(),
669 MetricValue::OperationDurationSeconds(start.elapsed()),
670 );
671 })
672 .inspect_err(|err| {
673 self.interceptor.observe(
674 labels.clone().with_error(err.kind()),
675 MetricValue::OperationErrorsTotal,
676 );
677 });
678
679 self.interceptor
680 .observe(labels, MetricValue::OperationExecuting(-1));
681 res
682 }
683
684 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
685 let labels = MetricLabels::new(self.info.clone(), Operation::Stat.into_static());
686
687 let start = Instant::now();
688
689 self.interceptor
690 .observe(labels.clone(), MetricValue::OperationExecuting(1));
691
692 let res = self
693 .inner()
694 .stat(path, args)
695 .await
696 .inspect(|_| {
697 self.interceptor.observe(
698 labels.clone(),
699 MetricValue::OperationDurationSeconds(start.elapsed()),
700 );
701 })
702 .inspect_err(|err| {
703 self.interceptor.observe(
704 labels.clone().with_error(err.kind()),
705 MetricValue::OperationErrorsTotal,
706 );
707 });
708
709 self.interceptor
710 .observe(labels, MetricValue::OperationExecuting(-1));
711 res
712 }
713
714 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
715 let labels = MetricLabels::new(self.info.clone(), Operation::Delete.into_static());
716
717 let start = Instant::now();
718
719 self.interceptor
720 .observe(labels.clone(), MetricValue::OperationExecuting(1));
721
722 let (rp, deleter) = self.inner.delete().await.inspect_err(|err| {
723 self.interceptor.observe(
724 labels.clone().with_error(err.kind()),
725 MetricValue::OperationErrorsTotal,
726 );
727 })?;
728
729 Ok((
730 rp,
731 MetricsWrapper::new(deleter, self.interceptor.clone(), labels, start),
732 ))
733 }
734
735 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
736 let labels = MetricLabels::new(self.info.clone(), Operation::List.into_static());
737
738 let start = Instant::now();
739
740 self.interceptor
741 .observe(labels.clone(), MetricValue::OperationExecuting(1));
742
743 let (rp, lister) = self.inner.list(path, args).await.inspect_err(|err| {
744 self.interceptor.observe(
745 labels.clone().with_error(err.kind()),
746 MetricValue::OperationErrorsTotal,
747 );
748 })?;
749
750 Ok((
751 rp,
752 MetricsWrapper::new(lister, self.interceptor.clone(), labels, start),
753 ))
754 }
755
756 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
757 let labels = MetricLabels::new(self.info.clone(), Operation::Presign.into_static());
758
759 let start = Instant::now();
760
761 self.interceptor
762 .observe(labels.clone(), MetricValue::OperationExecuting(1));
763
764 let res = self
765 .inner()
766 .presign(path, args)
767 .await
768 .inspect(|_| {
769 self.interceptor.observe(
770 labels.clone(),
771 MetricValue::OperationDurationSeconds(start.elapsed()),
772 );
773 })
774 .inspect_err(|err| {
775 self.interceptor.observe(
776 labels.clone().with_error(err.kind()),
777 MetricValue::OperationErrorsTotal,
778 );
779 });
780
781 self.interceptor
782 .observe(labels, MetricValue::OperationExecuting(-1));
783 res
784 }
785}
786
787pub struct MetricsWrapper<R, I: MetricsIntercept> {
788 inner: R,
789 interceptor: I,
790 labels: MetricLabels,
791
792 start: Instant,
793 size: u64,
794}
795
796impl<R, I: MetricsIntercept> Drop for MetricsWrapper<R, I> {
797 fn drop(&mut self) {
798 let size = self.size;
799 let duration = self.start.elapsed();
800
801 if self.labels.operation == Operation::Read.into_static()
802 || self.labels.operation == Operation::Write.into_static()
803 {
804 self.interceptor
805 .observe(self.labels.clone(), MetricValue::OperationBytes(self.size));
806 self.interceptor.observe(
807 self.labels.clone(),
808 MetricValue::OperationBytesRate(size as f64 / duration.as_secs_f64()),
809 );
810 } else {
811 self.interceptor.observe(
812 self.labels.clone(),
813 MetricValue::OperationEntries(self.size),
814 );
815 self.interceptor.observe(
816 self.labels.clone(),
817 MetricValue::OperationEntriesRate(size as f64 / duration.as_secs_f64()),
818 );
819 }
820
821 self.interceptor.observe(
822 self.labels.clone(),
823 MetricValue::OperationDurationSeconds(duration),
824 );
825 self.interceptor
826 .observe(self.labels.clone(), MetricValue::OperationExecuting(-1));
827 }
828}
829
830impl<R, I: MetricsIntercept> MetricsWrapper<R, I> {
831 fn new(inner: R, interceptor: I, labels: MetricLabels, start: Instant) -> Self {
832 Self {
833 inner,
834 interceptor,
835 labels,
836 start,
837 size: 0,
838 }
839 }
840}
841
842impl<R: oio::Read, I: MetricsIntercept> oio::Read for MetricsWrapper<R, I> {
843 async fn read(&mut self) -> Result<Buffer> {
844 self.inner
845 .read()
846 .await
847 .inspect(|bs| {
848 self.size += bs.len() as u64;
849 })
850 .inspect_err(|err| {
851 self.interceptor.observe(
852 self.labels.clone().with_error(err.kind()),
853 MetricValue::OperationErrorsTotal,
854 );
855 })
856 }
857}
858
859impl<R: oio::Write, I: MetricsIntercept> oio::Write for MetricsWrapper<R, I> {
860 async fn write(&mut self, bs: Buffer) -> Result<()> {
861 let size = bs.len();
862
863 self.inner
864 .write(bs)
865 .await
866 .inspect(|_| {
867 self.size += size as u64;
868 })
869 .inspect_err(|err| {
870 self.interceptor.observe(
871 self.labels.clone().with_error(err.kind()),
872 MetricValue::OperationErrorsTotal,
873 );
874 })
875 }
876
877 async fn close(&mut self) -> Result<Metadata> {
878 self.inner.close().await.inspect_err(|err| {
879 self.interceptor.observe(
880 self.labels.clone().with_error(err.kind()),
881 MetricValue::OperationErrorsTotal,
882 );
883 })
884 }
885
886 async fn abort(&mut self) -> Result<()> {
887 self.inner.abort().await.inspect_err(|err| {
888 self.interceptor.observe(
889 self.labels.clone().with_error(err.kind()),
890 MetricValue::OperationErrorsTotal,
891 );
892 })
893 }
894}
895
896impl<R: oio::List, I: MetricsIntercept> oio::List for MetricsWrapper<R, I> {
897 async fn next(&mut self) -> Result<Option<oio::Entry>> {
898 self.inner
899 .next()
900 .await
901 .inspect(|_| {
902 self.size += 1;
903 })
904 .inspect_err(|err| {
905 self.interceptor.observe(
906 self.labels.clone().with_error(err.kind()),
907 MetricValue::OperationErrorsTotal,
908 );
909 })
910 }
911}
912
913impl<R: oio::Delete, I: MetricsIntercept> oio::Delete for MetricsWrapper<R, I> {
914 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
915 self.inner
916 .delete(path, args)
917 .inspect(|_| {
918 self.size += 1;
919 })
920 .inspect_err(|err| {
921 self.interceptor.observe(
922 self.labels.clone().with_error(err.kind()),
923 MetricValue::OperationErrorsTotal,
924 );
925 })
926 }
927
928 async fn flush(&mut self) -> Result<usize> {
929 self.inner.flush().await.inspect_err(|err| {
930 self.interceptor.observe(
931 self.labels.clone().with_error(err.kind()),
932 MetricValue::OperationErrorsTotal,
933 );
934 })
935 }
936}