1use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::log;
23use log::Level;
24
25use crate::raw::*;
26use crate::*;
27
28#[derive(Debug)]
112pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
113 logger: I,
114}
115
116impl Default for LoggingLayer {
117 fn default() -> Self {
118 Self {
119 logger: DefaultLoggingInterceptor,
120 }
121 }
122}
123
124impl LoggingLayer {
125 pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
127 LoggingLayer { logger }
128 }
129}
130
131impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
132 type LayeredAccess = LoggingAccessor<A, I>;
133
134 fn layer(&self, inner: A) -> Self::LayeredAccess {
135 let info = inner.info();
136 LoggingAccessor {
137 inner,
138
139 info,
140 logger: self.logger.clone(),
141 }
142 }
143}
144
145pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
147 fn log(
163 &self,
164 info: &AccessorInfo,
165 operation: Operation,
166 context: &[(&str, &str)],
167 message: &str,
168 err: Option<&Error>,
169 );
170}
171
172#[derive(Debug, Copy, Clone, Default)]
174pub struct DefaultLoggingInterceptor;
175
176impl LoggingInterceptor for DefaultLoggingInterceptor {
177 #[inline]
178 fn log(
179 &self,
180 info: &AccessorInfo,
181 operation: Operation,
182 context: &[(&str, &str)],
183 message: &str,
184 err: Option<&Error>,
185 ) {
186 if let Some(err) = err {
187 let lvl = if err.kind() == ErrorKind::Unexpected {
189 Level::Error
190 } else {
191 Level::Warn
192 };
193
194 log!(
195 target: LOGGING_TARGET,
196 lvl,
197 "service={} name={}{}: {operation} {message} {}",
198 info.scheme(),
199 info.name(),
200 LoggingContext(context),
201 if err.kind() != ErrorKind::Unexpected {
206 format!("{err}")
207 } else {
208 format!("{err:?}")
209 }
210 );
211 }
212
213 log!(
214 target: LOGGING_TARGET,
215 Level::Debug,
216 "service={} name={}{}: {operation} {message}",
217 info.scheme(),
218 info.name(),
219 LoggingContext(context),
220 );
221 }
222}
223
224struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
225
226impl Display for LoggingContext<'_> {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 for (k, v) in self.0.iter() {
229 write!(f, " {}={}", k, v)?;
230 }
231 Ok(())
232 }
233}
234
235#[derive(Clone, Debug)]
236pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
237 inner: A,
238
239 info: Arc<AccessorInfo>,
240 logger: I,
241}
242
243static LOGGING_TARGET: &str = "opendal::services";
244
245impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
246 type Inner = A;
247 type Reader = LoggingReader<A::Reader, I>;
248 type BlockingReader = LoggingReader<A::BlockingReader, I>;
249 type Writer = LoggingWriter<A::Writer, I>;
250 type BlockingWriter = LoggingWriter<A::BlockingWriter, I>;
251 type Lister = LoggingLister<A::Lister, I>;
252 type BlockingLister = LoggingLister<A::BlockingLister, I>;
253 type Deleter = LoggingDeleter<A::Deleter, I>;
254 type BlockingDeleter = LoggingDeleter<A::BlockingDeleter, I>;
255
256 fn inner(&self) -> &Self::Inner {
257 &self.inner
258 }
259
260 fn info(&self) -> Arc<AccessorInfo> {
261 self.info.clone()
262 }
263
264 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
265 self.logger.log(
266 &self.info,
267 Operation::CreateDir,
268 &[("path", path)],
269 "started",
270 None,
271 );
272
273 self.inner
274 .create_dir(path, args)
275 .await
276 .inspect(|_| {
277 self.logger.log(
278 &self.info,
279 Operation::CreateDir,
280 &[("path", path)],
281 "finished",
282 None,
283 );
284 })
285 .inspect_err(|err| {
286 self.logger.log(
287 &self.info,
288 Operation::CreateDir,
289 &[("path", path)],
290 "failed",
291 Some(err),
292 );
293 })
294 }
295
296 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
297 self.logger.log(
298 &self.info,
299 Operation::Read,
300 &[("path", path)],
301 "started",
302 None,
303 );
304
305 self.inner
306 .read(path, args)
307 .await
308 .map(|(rp, r)| {
309 self.logger.log(
310 &self.info,
311 Operation::Read,
312 &[("path", path)],
313 "created reader",
314 None,
315 );
316 (
317 rp,
318 LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
319 )
320 })
321 .inspect_err(|err| {
322 self.logger.log(
323 &self.info,
324 Operation::Read,
325 &[("path", path)],
326 "failed",
327 Some(err),
328 );
329 })
330 }
331
332 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
333 self.logger.log(
334 &self.info,
335 Operation::Write,
336 &[("path", path)],
337 "started",
338 None,
339 );
340
341 self.inner
342 .write(path, args)
343 .await
344 .map(|(rp, w)| {
345 self.logger.log(
346 &self.info,
347 Operation::Write,
348 &[("path", path)],
349 "created writer",
350 None,
351 );
352 let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
353 (rp, w)
354 })
355 .inspect_err(|err| {
356 self.logger.log(
357 &self.info,
358 Operation::Write,
359 &[("path", path)],
360 "failed",
361 Some(err),
362 );
363 })
364 }
365
366 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
367 self.logger.log(
368 &self.info,
369 Operation::Copy,
370 &[("from", from), ("to", to)],
371 "started",
372 None,
373 );
374
375 self.inner
376 .copy(from, to, args)
377 .await
378 .inspect(|_| {
379 self.logger.log(
380 &self.info,
381 Operation::Copy,
382 &[("from", from), ("to", to)],
383 "finished",
384 None,
385 );
386 })
387 .inspect_err(|err| {
388 self.logger.log(
389 &self.info,
390 Operation::Copy,
391 &[("from", from), ("to", to)],
392 "failed",
393 Some(err),
394 );
395 })
396 }
397
398 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
399 self.logger.log(
400 &self.info,
401 Operation::Rename,
402 &[("from", from), ("to", to)],
403 "started",
404 None,
405 );
406
407 self.inner
408 .rename(from, to, args)
409 .await
410 .inspect(|_| {
411 self.logger.log(
412 &self.info,
413 Operation::Rename,
414 &[("from", from), ("to", to)],
415 "finished",
416 None,
417 );
418 })
419 .inspect_err(|err| {
420 self.logger.log(
421 &self.info,
422 Operation::Rename,
423 &[("from", from), ("to", to)],
424 "failed",
425 Some(err),
426 );
427 })
428 }
429
430 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
431 self.logger.log(
432 &self.info,
433 Operation::Stat,
434 &[("path", path)],
435 "started",
436 None,
437 );
438
439 self.inner
440 .stat(path, args)
441 .await
442 .inspect(|_| {
443 self.logger.log(
444 &self.info,
445 Operation::Stat,
446 &[("path", path)],
447 "finished",
448 None,
449 );
450 })
451 .inspect_err(|err| {
452 self.logger.log(
453 &self.info,
454 Operation::Stat,
455 &[("path", path)],
456 "failed",
457 Some(err),
458 );
459 })
460 }
461
462 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
463 self.logger
464 .log(&self.info, Operation::Delete, &[], "started", None);
465
466 self.inner
467 .delete()
468 .await
469 .map(|(rp, d)| {
470 self.logger
471 .log(&self.info, Operation::Delete, &[], "finished", None);
472 let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
473 (rp, d)
474 })
475 .inspect_err(|err| {
476 self.logger
477 .log(&self.info, Operation::Delete, &[], "failed", Some(err));
478 })
479 }
480
481 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
482 self.logger.log(
483 &self.info,
484 Operation::List,
485 &[("path", path)],
486 "started",
487 None,
488 );
489
490 self.inner
491 .list(path, args)
492 .await
493 .map(|(rp, v)| {
494 self.logger.log(
495 &self.info,
496 Operation::List,
497 &[("path", path)],
498 "created lister",
499 None,
500 );
501 let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
502 (rp, streamer)
503 })
504 .inspect_err(|err| {
505 self.logger.log(
506 &self.info,
507 Operation::List,
508 &[("path", path)],
509 "failed",
510 Some(err),
511 );
512 })
513 }
514
515 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
516 self.logger.log(
517 &self.info,
518 Operation::Presign,
519 &[("path", path)],
520 "started",
521 None,
522 );
523
524 self.inner
525 .presign(path, args)
526 .await
527 .inspect(|_| {
528 self.logger.log(
529 &self.info,
530 Operation::Presign,
531 &[("path", path)],
532 "finished",
533 None,
534 );
535 })
536 .inspect_err(|err| {
537 self.logger.log(
538 &self.info,
539 Operation::Presign,
540 &[("path", path)],
541 "failed",
542 Some(err),
543 );
544 })
545 }
546
547 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
548 self.logger.log(
549 &self.info,
550 Operation::CreateDir,
551 &[("path", path)],
552 "started",
553 None,
554 );
555
556 self.inner
557 .blocking_create_dir(path, args)
558 .inspect(|_| {
559 self.logger.log(
560 &self.info,
561 Operation::CreateDir,
562 &[("path", path)],
563 "finished",
564 None,
565 );
566 })
567 .inspect_err(|err| {
568 self.logger.log(
569 &self.info,
570 Operation::CreateDir,
571 &[("path", path)],
572 "failed",
573 Some(err),
574 );
575 })
576 }
577
578 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
579 self.logger.log(
580 &self.info,
581 Operation::Read,
582 &[("path", path)],
583 "started",
584 None,
585 );
586
587 self.inner
588 .blocking_read(path, args.clone())
589 .map(|(rp, r)| {
590 self.logger.log(
591 &self.info,
592 Operation::Read,
593 &[("path", path)],
594 "created reader",
595 None,
596 );
597 let r = LoggingReader::new(self.info.clone(), self.logger.clone(), path, r);
598 (rp, r)
599 })
600 .inspect_err(|err| {
601 self.logger.log(
602 &self.info,
603 Operation::Read,
604 &[("path", path)],
605 "failed",
606 Some(err),
607 );
608 })
609 }
610
611 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
612 self.logger.log(
613 &self.info,
614 Operation::Write,
615 &[("path", path)],
616 "started",
617 None,
618 );
619
620 self.inner
621 .blocking_write(path, args)
622 .map(|(rp, w)| {
623 self.logger.log(
624 &self.info,
625 Operation::Write,
626 &[("path", path)],
627 "created writer",
628 None,
629 );
630 let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
631 (rp, w)
632 })
633 .inspect_err(|err| {
634 self.logger.log(
635 &self.info,
636 Operation::Write,
637 &[("path", path)],
638 "failed",
639 Some(err),
640 );
641 })
642 }
643
644 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
645 self.logger.log(
646 &self.info,
647 Operation::Copy,
648 &[("from", from), ("to", to)],
649 "started",
650 None,
651 );
652
653 self.inner
654 .blocking_copy(from, to, args)
655 .inspect(|_| {
656 self.logger.log(
657 &self.info,
658 Operation::Copy,
659 &[("from", from), ("to", to)],
660 "finished",
661 None,
662 );
663 })
664 .inspect_err(|err| {
665 self.logger.log(
666 &self.info,
667 Operation::Copy,
668 &[("from", from), ("to", to)],
669 "",
670 Some(err),
671 );
672 })
673 }
674
675 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
676 self.logger.log(
677 &self.info,
678 Operation::Rename,
679 &[("from", from), ("to", to)],
680 "started",
681 None,
682 );
683
684 self.inner
685 .blocking_rename(from, to, args)
686 .inspect(|_| {
687 self.logger.log(
688 &self.info,
689 Operation::Rename,
690 &[("from", from), ("to", to)],
691 "finished",
692 None,
693 );
694 })
695 .inspect_err(|err| {
696 self.logger.log(
697 &self.info,
698 Operation::Rename,
699 &[("from", from), ("to", to)],
700 "failed",
701 Some(err),
702 );
703 })
704 }
705
706 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
707 self.logger.log(
708 &self.info,
709 Operation::Stat,
710 &[("path", path)],
711 "started",
712 None,
713 );
714
715 self.inner
716 .blocking_stat(path, args)
717 .inspect(|_| {
718 self.logger.log(
719 &self.info,
720 Operation::Stat,
721 &[("path", path)],
722 "finished",
723 None,
724 );
725 })
726 .inspect_err(|err| {
727 self.logger.log(
728 &self.info,
729 Operation::Stat,
730 &[("path", path)],
731 "failed",
732 Some(err),
733 );
734 })
735 }
736
737 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
738 self.logger
739 .log(&self.info, Operation::Delete, &[], "started", None);
740
741 self.inner
742 .blocking_delete()
743 .map(|(rp, d)| {
744 self.logger
745 .log(&self.info, Operation::Delete, &[], "finished", None);
746 let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
747 (rp, d)
748 })
749 .inspect_err(|err| {
750 self.logger
751 .log(&self.info, Operation::Delete, &[], "failed", Some(err));
752 })
753 }
754
755 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
756 self.logger.log(
757 &self.info,
758 Operation::List,
759 &[("path", path)],
760 "started",
761 None,
762 );
763
764 self.inner
765 .blocking_list(path, args)
766 .map(|(rp, v)| {
767 self.logger.log(
768 &self.info,
769 Operation::List,
770 &[("path", path)],
771 "created lister",
772 None,
773 );
774 let li = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
775 (rp, li)
776 })
777 .inspect_err(|err| {
778 self.logger.log(
779 &self.info,
780 Operation::List,
781 &[("path", path)],
782 "",
783 Some(err),
784 );
785 })
786 }
787}
788
789pub struct LoggingReader<R, I: LoggingInterceptor> {
791 info: Arc<AccessorInfo>,
792 logger: I,
793 path: String,
794
795 read: u64,
796 inner: R,
797}
798
799impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
800 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
801 Self {
802 info,
803 logger,
804 path: path.to_string(),
805
806 read: 0,
807 inner: reader,
808 }
809 }
810}
811
812impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
813 async fn read(&mut self) -> Result<Buffer> {
814 self.logger.log(
815 &self.info,
816 Operation::Read,
817 &[("path", &self.path), ("read", &self.read.to_string())],
818 "started",
819 None,
820 );
821
822 match self.inner.read().await {
823 Ok(bs) => {
824 self.read += bs.len() as u64;
825 self.logger.log(
826 &self.info,
827 Operation::Read,
828 &[
829 ("path", &self.path),
830 ("read", &self.read.to_string()),
831 ("size", &bs.len().to_string()),
832 ],
833 if bs.is_empty() {
834 "finished"
835 } else {
836 "succeeded"
837 },
838 None,
839 );
840 Ok(bs)
841 }
842 Err(err) => {
843 self.logger.log(
844 &self.info,
845 Operation::Read,
846 &[("path", &self.path), ("read", &self.read.to_string())],
847 "failed",
848 Some(&err),
849 );
850 Err(err)
851 }
852 }
853 }
854}
855
856impl<R: oio::BlockingRead, I: LoggingInterceptor> oio::BlockingRead for LoggingReader<R, I> {
857 fn read(&mut self) -> Result<Buffer> {
858 self.logger.log(
859 &self.info,
860 Operation::Read,
861 &[("path", &self.path), ("read", &self.read.to_string())],
862 "started",
863 None,
864 );
865
866 match self.inner.read() {
867 Ok(bs) => {
868 self.read += bs.len() as u64;
869 self.logger.log(
870 &self.info,
871 Operation::Read,
872 &[
873 ("path", &self.path),
874 ("read", &self.read.to_string()),
875 ("size", &bs.len().to_string()),
876 ],
877 if bs.is_empty() {
878 "finished"
879 } else {
880 "succeeded"
881 },
882 None,
883 );
884 Ok(bs)
885 }
886 Err(err) => {
887 self.logger.log(
888 &self.info,
889 Operation::Read,
890 &[("path", &self.path), ("read", &self.read.to_string())],
891 "failed",
892 Some(&err),
893 );
894 Err(err)
895 }
896 }
897 }
898}
899
900pub struct LoggingWriter<W, I> {
901 info: Arc<AccessorInfo>,
902 logger: I,
903 path: String,
904
905 written: u64,
906 inner: W,
907}
908
909impl<W, I> LoggingWriter<W, I> {
910 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
911 Self {
912 info,
913 logger,
914 path: path.to_string(),
915
916 written: 0,
917 inner: writer,
918 }
919 }
920}
921
922impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
923 async fn write(&mut self, bs: Buffer) -> Result<()> {
924 let size = bs.len();
925
926 self.logger.log(
927 &self.info,
928 Operation::Write,
929 &[
930 ("path", &self.path),
931 ("written", &self.written.to_string()),
932 ("size", &size.to_string()),
933 ],
934 "started",
935 None,
936 );
937
938 match self.inner.write(bs).await {
939 Ok(_) => {
940 self.written += size as u64;
941 self.logger.log(
942 &self.info,
943 Operation::Write,
944 &[
945 ("path", &self.path),
946 ("written", &self.written.to_string()),
947 ("size", &size.to_string()),
948 ],
949 "succeeded",
950 None,
951 );
952 Ok(())
953 }
954 Err(err) => {
955 self.logger.log(
956 &self.info,
957 Operation::Write,
958 &[
959 ("path", &self.path),
960 ("written", &self.written.to_string()),
961 ("size", &size.to_string()),
962 ],
963 "failed",
964 Some(&err),
965 );
966 Err(err)
967 }
968 }
969 }
970
971 async fn abort(&mut self) -> Result<()> {
972 self.logger.log(
973 &self.info,
974 Operation::Write,
975 &[("path", &self.path), ("written", &self.written.to_string())],
976 "started",
977 None,
978 );
979
980 match self.inner.abort().await {
981 Ok(_) => {
982 self.logger.log(
983 &self.info,
984 Operation::Write,
985 &[("path", &self.path), ("written", &self.written.to_string())],
986 "succeeded",
987 None,
988 );
989 Ok(())
990 }
991 Err(err) => {
992 self.logger.log(
993 &self.info,
994 Operation::Write,
995 &[("path", &self.path), ("written", &self.written.to_string())],
996 "failed",
997 Some(&err),
998 );
999 Err(err)
1000 }
1001 }
1002 }
1003
1004 async fn close(&mut self) -> Result<Metadata> {
1005 self.logger.log(
1006 &self.info,
1007 Operation::Write,
1008 &[("path", &self.path), ("written", &self.written.to_string())],
1009 "started",
1010 None,
1011 );
1012
1013 match self.inner.close().await {
1014 Ok(meta) => {
1015 self.logger.log(
1016 &self.info,
1017 Operation::Write,
1018 &[("path", &self.path), ("written", &self.written.to_string())],
1019 "succeeded",
1020 None,
1021 );
1022 Ok(meta)
1023 }
1024 Err(err) => {
1025 self.logger.log(
1026 &self.info,
1027 Operation::Write,
1028 &[("path", &self.path), ("written", &self.written.to_string())],
1029 "failed",
1030 Some(&err),
1031 );
1032 Err(err)
1033 }
1034 }
1035 }
1036}
1037
1038impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for LoggingWriter<W, I> {
1039 fn write(&mut self, bs: Buffer) -> Result<()> {
1040 let size = bs.len();
1041
1042 self.logger.log(
1043 &self.info,
1044 Operation::Write,
1045 &[
1046 ("path", &self.path),
1047 ("written", &self.written.to_string()),
1048 ("size", &size.to_string()),
1049 ],
1050 "started",
1051 None,
1052 );
1053
1054 match self.inner.write(bs) {
1055 Ok(_) => {
1056 self.logger.log(
1057 &self.info,
1058 Operation::Write,
1059 &[
1060 ("path", &self.path),
1061 ("written", &self.written.to_string()),
1062 ("size", &size.to_string()),
1063 ],
1064 "succeeded",
1065 None,
1066 );
1067 Ok(())
1068 }
1069 Err(err) => {
1070 self.logger.log(
1071 &self.info,
1072 Operation::Write,
1073 &[
1074 ("path", &self.path),
1075 ("written", &self.written.to_string()),
1076 ("size", &size.to_string()),
1077 ],
1078 "failed",
1079 Some(&err),
1080 );
1081 Err(err)
1082 }
1083 }
1084 }
1085
1086 fn close(&mut self) -> Result<Metadata> {
1087 self.logger.log(
1088 &self.info,
1089 Operation::Write,
1090 &[("path", &self.path), ("written", &self.written.to_string())],
1091 "started",
1092 None,
1093 );
1094
1095 match self.inner.close() {
1096 Ok(meta) => {
1097 self.logger.log(
1098 &self.info,
1099 Operation::Write,
1100 &[("path", &self.path), ("written", &self.written.to_string())],
1101 "succeeded",
1102 None,
1103 );
1104 Ok(meta)
1105 }
1106 Err(err) => {
1107 self.logger.log(
1108 &self.info,
1109 Operation::Write,
1110 &[("path", &self.path), ("written", &self.written.to_string())],
1111 "failed",
1112 Some(&err),
1113 );
1114 Err(err)
1115 }
1116 }
1117 }
1118}
1119
1120pub struct LoggingLister<P, I: LoggingInterceptor> {
1121 info: Arc<AccessorInfo>,
1122 logger: I,
1123 path: String,
1124
1125 listed: usize,
1126 inner: P,
1127}
1128
1129impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
1130 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
1131 Self {
1132 info,
1133 logger,
1134 path: path.to_string(),
1135
1136 listed: 0,
1137 inner,
1138 }
1139 }
1140}
1141
1142impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
1143 async fn next(&mut self) -> Result<Option<oio::Entry>> {
1144 self.logger.log(
1145 &self.info,
1146 Operation::List,
1147 &[("path", &self.path), ("listed", &self.listed.to_string())],
1148 "started",
1149 None,
1150 );
1151
1152 let res = self.inner.next().await;
1153
1154 match &res {
1155 Ok(Some(de)) => {
1156 self.listed += 1;
1157 self.logger.log(
1158 &self.info,
1159 Operation::List,
1160 &[
1161 ("path", &self.path),
1162 ("listed", &self.listed.to_string()),
1163 ("entry", de.path()),
1164 ],
1165 "succeeded",
1166 None,
1167 );
1168 }
1169 Ok(None) => {
1170 self.logger.log(
1171 &self.info,
1172 Operation::List,
1173 &[("path", &self.path), ("listed", &self.listed.to_string())],
1174 "finished",
1175 None,
1176 );
1177 }
1178 Err(err) => {
1179 self.logger.log(
1180 &self.info,
1181 Operation::List,
1182 &[("path", &self.path), ("listed", &self.listed.to_string())],
1183 "failed",
1184 Some(err),
1185 );
1186 }
1187 };
1188
1189 res
1190 }
1191}
1192
1193impl<P: oio::BlockingList, I: LoggingInterceptor> oio::BlockingList for LoggingLister<P, I> {
1194 fn next(&mut self) -> Result<Option<oio::Entry>> {
1195 self.logger.log(
1196 &self.info,
1197 Operation::List,
1198 &[("path", &self.path), ("listed", &self.listed.to_string())],
1199 "started",
1200 None,
1201 );
1202
1203 let res = self.inner.next();
1204 match &res {
1205 Ok(Some(de)) => {
1206 self.listed += 1;
1207 self.logger.log(
1208 &self.info,
1209 Operation::List,
1210 &[
1211 ("path", &self.path),
1212 ("listed", &self.listed.to_string()),
1213 ("entry", de.path()),
1214 ],
1215 "succeeded",
1216 None,
1217 );
1218 }
1219 Ok(None) => {
1220 self.logger.log(
1221 &self.info,
1222 Operation::List,
1223 &[("path", &self.path), ("listed", &self.listed.to_string())],
1224 "finished",
1225 None,
1226 );
1227 }
1228 Err(err) => {
1229 self.logger.log(
1230 &self.info,
1231 Operation::List,
1232 &[("path", &self.path), ("listed", &self.listed.to_string())],
1233 "failed",
1234 Some(err),
1235 );
1236 }
1237 };
1238
1239 res
1240 }
1241}
1242
1243pub struct LoggingDeleter<D, I: LoggingInterceptor> {
1244 info: Arc<AccessorInfo>,
1245 logger: I,
1246
1247 queued: usize,
1248 deleted: usize,
1249 inner: D,
1250}
1251
1252impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
1253 fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
1254 Self {
1255 info,
1256 logger,
1257
1258 queued: 0,
1259 deleted: 0,
1260 inner,
1261 }
1262 }
1263}
1264
1265impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
1266 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1267 let version = args
1268 .version()
1269 .map(|v| v.to_string())
1270 .unwrap_or_else(|| "<latest>".to_string());
1271
1272 self.logger.log(
1273 &self.info,
1274 Operation::Delete,
1275 &[("path", path), ("version", &version)],
1276 "started",
1277 None,
1278 );
1279
1280 let res = self.inner.delete(path, args);
1281
1282 match &res {
1283 Ok(_) => {
1284 self.queued += 1;
1285 self.logger.log(
1286 &self.info,
1287 Operation::Delete,
1288 &[
1289 ("path", path),
1290 ("version", &version),
1291 ("queued", &self.queued.to_string()),
1292 ("deleted", &self.deleted.to_string()),
1293 ],
1294 "succeeded",
1295 None,
1296 );
1297 }
1298 Err(err) => {
1299 self.logger.log(
1300 &self.info,
1301 Operation::Delete,
1302 &[
1303 ("path", path),
1304 ("version", &version),
1305 ("queued", &self.queued.to_string()),
1306 ("deleted", &self.deleted.to_string()),
1307 ],
1308 "failed",
1309 Some(err),
1310 );
1311 }
1312 };
1313
1314 res
1315 }
1316
1317 async fn flush(&mut self) -> Result<usize> {
1318 self.logger.log(
1319 &self.info,
1320 Operation::Delete,
1321 &[
1322 ("queued", &self.queued.to_string()),
1323 ("deleted", &self.deleted.to_string()),
1324 ],
1325 "started",
1326 None,
1327 );
1328
1329 let res = self.inner.flush().await;
1330
1331 match &res {
1332 Ok(flushed) => {
1333 self.queued -= flushed;
1334 self.deleted += flushed;
1335 self.logger.log(
1336 &self.info,
1337 Operation::Delete,
1338 &[
1339 ("queued", &self.queued.to_string()),
1340 ("deleted", &self.deleted.to_string()),
1341 ],
1342 "succeeded",
1343 None,
1344 );
1345 }
1346 Err(err) => {
1347 self.logger.log(
1348 &self.info,
1349 Operation::Delete,
1350 &[
1351 ("queued", &self.queued.to_string()),
1352 ("deleted", &self.deleted.to_string()),
1353 ],
1354 "failed",
1355 Some(err),
1356 );
1357 }
1358 };
1359
1360 res
1361 }
1362}
1363
1364impl<D: oio::BlockingDelete, I: LoggingInterceptor> oio::BlockingDelete for LoggingDeleter<D, I> {
1365 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1366 let version = args
1367 .version()
1368 .map(|v| v.to_string())
1369 .unwrap_or_else(|| "<latest>".to_string());
1370
1371 self.logger.log(
1372 &self.info,
1373 Operation::Delete,
1374 &[("path", path), ("version", &version)],
1375 "started",
1376 None,
1377 );
1378
1379 let res = self.inner.delete(path, args);
1380
1381 match &res {
1382 Ok(_) => {
1383 self.queued += 1;
1384 self.logger.log(
1385 &self.info,
1386 Operation::Delete,
1387 &[
1388 ("path", path),
1389 ("version", &version),
1390 ("queued", &self.queued.to_string()),
1391 ("deleted", &self.deleted.to_string()),
1392 ],
1393 "succeeded",
1394 None,
1395 );
1396 }
1397 Err(err) => {
1398 self.logger.log(
1399 &self.info,
1400 Operation::Delete,
1401 &[
1402 ("path", path),
1403 ("version", &version),
1404 ("queued", &self.queued.to_string()),
1405 ("deleted", &self.deleted.to_string()),
1406 ],
1407 "failed",
1408 Some(err),
1409 );
1410 }
1411 };
1412
1413 res
1414 }
1415
1416 fn flush(&mut self) -> Result<usize> {
1417 self.logger.log(
1418 &self.info,
1419 Operation::Delete,
1420 &[
1421 ("queued", &self.queued.to_string()),
1422 ("deleted", &self.deleted.to_string()),
1423 ],
1424 "started",
1425 None,
1426 );
1427
1428 let res = self.inner.flush();
1429
1430 match &res {
1431 Ok(flushed) => {
1432 self.queued -= flushed;
1433 self.deleted += flushed;
1434 self.logger.log(
1435 &self.info,
1436 Operation::Delete,
1437 &[
1438 ("queued", &self.queued.to_string()),
1439 ("deleted", &self.deleted.to_string()),
1440 ],
1441 "succeeded",
1442 None,
1443 );
1444 }
1445 Err(err) => {
1446 self.logger.log(
1447 &self.info,
1448 Operation::Delete,
1449 &[
1450 ("queued", &self.queued.to_string()),
1451 ("deleted", &self.deleted.to_string()),
1452 ],
1453 "failed",
1454 Some(err),
1455 );
1456 }
1457 };
1458
1459 res
1460 }
1461}