1use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::log;
23use log::Level;
24
25use crate::raw::*;
26use crate::*;
27
28#[derive(Debug)]
112pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
113 logger: I,
114}
115
116impl Default for LoggingLayer {
117 fn default() -> Self {
118 Self {
119 logger: DefaultLoggingInterceptor,
120 }
121 }
122}
123
124impl LoggingLayer {
125 pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
127 LoggingLayer { logger }
128 }
129}
130
131impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
132 type LayeredAccess = LoggingAccessor<A, I>;
133
134 fn layer(&self, inner: A) -> Self::LayeredAccess {
135 let info = inner.info();
136 LoggingAccessor {
137 inner,
138
139 info,
140 logger: self.logger.clone(),
141 }
142 }
143}
144
145pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
147 fn log(
163 &self,
164 info: &AccessorInfo,
165 operation: Operation,
166 context: &[(&str, &str)],
167 message: &str,
168 err: Option<&Error>,
169 );
170}
171
172#[derive(Debug, Copy, Clone, Default)]
174pub struct DefaultLoggingInterceptor;
175
176impl LoggingInterceptor for DefaultLoggingInterceptor {
177 #[inline]
178 fn log(
179 &self,
180 info: &AccessorInfo,
181 operation: Operation,
182 context: &[(&str, &str)],
183 message: &str,
184 err: Option<&Error>,
185 ) {
186 if let Some(err) = err {
187 let lvl = if err.kind() == ErrorKind::Unexpected {
189 Level::Error
190 } else {
191 Level::Warn
192 };
193
194 log!(
195 target: LOGGING_TARGET,
196 lvl,
197 "service={} name={}{}: {operation} {message} {}",
198 info.scheme(),
199 info.name(),
200 LoggingContext(context),
201 if err.kind() != ErrorKind::Unexpected {
206 format!("{err}")
207 } else {
208 format!("{err:?}")
209 }
210 );
211 }
212
213 log!(
214 target: LOGGING_TARGET,
215 Level::Debug,
216 "service={} name={}{}: {operation} {message}",
217 info.scheme(),
218 info.name(),
219 LoggingContext(context),
220 );
221 }
222}
223
224struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
225
226impl Display for LoggingContext<'_> {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 for (k, v) in self.0.iter() {
229 write!(f, " {}={}", k, v)?;
230 }
231 Ok(())
232 }
233}
234
235#[derive(Clone, Debug)]
236pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
237 inner: A,
238
239 info: Arc<AccessorInfo>,
240 logger: I,
241}
242
243static LOGGING_TARGET: &str = "opendal::services";
244
245impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
246 type Inner = A;
247 type Reader = LoggingReader<A::Reader, I>;
248 type Writer = LoggingWriter<A::Writer, I>;
249 type Lister = LoggingLister<A::Lister, I>;
250 type Deleter = LoggingDeleter<A::Deleter, I>;
251
252 fn inner(&self) -> &Self::Inner {
253 &self.inner
254 }
255
256 fn info(&self) -> Arc<AccessorInfo> {
257 self.info.clone()
258 }
259
260 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
261 self.logger.log(
262 &self.info,
263 Operation::CreateDir,
264 &[("path", path)],
265 "started",
266 None,
267 );
268
269 self.inner
270 .create_dir(path, args)
271 .await
272 .inspect(|_| {
273 self.logger.log(
274 &self.info,
275 Operation::CreateDir,
276 &[("path", path)],
277 "finished",
278 None,
279 );
280 })
281 .inspect_err(|err| {
282 self.logger.log(
283 &self.info,
284 Operation::CreateDir,
285 &[("path", path)],
286 "failed",
287 Some(err),
288 );
289 })
290 }
291
292 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
293 self.logger.log(
294 &self.info,
295 Operation::Read,
296 &[("path", path)],
297 "started",
298 None,
299 );
300
301 self.inner
302 .read(path, args)
303 .await
304 .map(|(rp, r)| {
305 self.logger.log(
306 &self.info,
307 Operation::Read,
308 &[("path", path)],
309 "created reader",
310 None,
311 );
312 (
313 rp,
314 LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
315 )
316 })
317 .inspect_err(|err| {
318 self.logger.log(
319 &self.info,
320 Operation::Read,
321 &[("path", path)],
322 "failed",
323 Some(err),
324 );
325 })
326 }
327
328 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
329 self.logger.log(
330 &self.info,
331 Operation::Write,
332 &[("path", path)],
333 "started",
334 None,
335 );
336
337 self.inner
338 .write(path, args)
339 .await
340 .map(|(rp, w)| {
341 self.logger.log(
342 &self.info,
343 Operation::Write,
344 &[("path", path)],
345 "created writer",
346 None,
347 );
348 let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
349 (rp, w)
350 })
351 .inspect_err(|err| {
352 self.logger.log(
353 &self.info,
354 Operation::Write,
355 &[("path", path)],
356 "failed",
357 Some(err),
358 );
359 })
360 }
361
362 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
363 self.logger.log(
364 &self.info,
365 Operation::Copy,
366 &[("from", from), ("to", to)],
367 "started",
368 None,
369 );
370
371 self.inner
372 .copy(from, to, args)
373 .await
374 .inspect(|_| {
375 self.logger.log(
376 &self.info,
377 Operation::Copy,
378 &[("from", from), ("to", to)],
379 "finished",
380 None,
381 );
382 })
383 .inspect_err(|err| {
384 self.logger.log(
385 &self.info,
386 Operation::Copy,
387 &[("from", from), ("to", to)],
388 "failed",
389 Some(err),
390 );
391 })
392 }
393
394 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
395 self.logger.log(
396 &self.info,
397 Operation::Rename,
398 &[("from", from), ("to", to)],
399 "started",
400 None,
401 );
402
403 self.inner
404 .rename(from, to, args)
405 .await
406 .inspect(|_| {
407 self.logger.log(
408 &self.info,
409 Operation::Rename,
410 &[("from", from), ("to", to)],
411 "finished",
412 None,
413 );
414 })
415 .inspect_err(|err| {
416 self.logger.log(
417 &self.info,
418 Operation::Rename,
419 &[("from", from), ("to", to)],
420 "failed",
421 Some(err),
422 );
423 })
424 }
425
426 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
427 self.logger.log(
428 &self.info,
429 Operation::Stat,
430 &[("path", path)],
431 "started",
432 None,
433 );
434
435 self.inner
436 .stat(path, args)
437 .await
438 .inspect(|_| {
439 self.logger.log(
440 &self.info,
441 Operation::Stat,
442 &[("path", path)],
443 "finished",
444 None,
445 );
446 })
447 .inspect_err(|err| {
448 self.logger.log(
449 &self.info,
450 Operation::Stat,
451 &[("path", path)],
452 "failed",
453 Some(err),
454 );
455 })
456 }
457
458 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
459 self.logger
460 .log(&self.info, Operation::Delete, &[], "started", None);
461
462 self.inner
463 .delete()
464 .await
465 .map(|(rp, d)| {
466 self.logger
467 .log(&self.info, Operation::Delete, &[], "finished", None);
468 let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
469 (rp, d)
470 })
471 .inspect_err(|err| {
472 self.logger
473 .log(&self.info, Operation::Delete, &[], "failed", Some(err));
474 })
475 }
476
477 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
478 self.logger.log(
479 &self.info,
480 Operation::List,
481 &[("path", path)],
482 "started",
483 None,
484 );
485
486 self.inner
487 .list(path, args)
488 .await
489 .map(|(rp, v)| {
490 self.logger.log(
491 &self.info,
492 Operation::List,
493 &[("path", path)],
494 "created lister",
495 None,
496 );
497 let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
498 (rp, streamer)
499 })
500 .inspect_err(|err| {
501 self.logger.log(
502 &self.info,
503 Operation::List,
504 &[("path", path)],
505 "failed",
506 Some(err),
507 );
508 })
509 }
510
511 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
512 self.logger.log(
513 &self.info,
514 Operation::Presign,
515 &[("path", path)],
516 "started",
517 None,
518 );
519
520 self.inner
521 .presign(path, args)
522 .await
523 .inspect(|_| {
524 self.logger.log(
525 &self.info,
526 Operation::Presign,
527 &[("path", path)],
528 "finished",
529 None,
530 );
531 })
532 .inspect_err(|err| {
533 self.logger.log(
534 &self.info,
535 Operation::Presign,
536 &[("path", path)],
537 "failed",
538 Some(err),
539 );
540 })
541 }
542}
543
544pub struct LoggingReader<R, I: LoggingInterceptor> {
546 info: Arc<AccessorInfo>,
547 logger: I,
548 path: String,
549
550 read: u64,
551 inner: R,
552}
553
554impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
555 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
556 Self {
557 info,
558 logger,
559 path: path.to_string(),
560
561 read: 0,
562 inner: reader,
563 }
564 }
565}
566
567impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
568 async fn read(&mut self) -> Result<Buffer> {
569 self.logger.log(
570 &self.info,
571 Operation::Read,
572 &[("path", &self.path), ("read", &self.read.to_string())],
573 "started",
574 None,
575 );
576
577 match self.inner.read().await {
578 Ok(bs) => {
579 self.read += bs.len() as u64;
580 self.logger.log(
581 &self.info,
582 Operation::Read,
583 &[
584 ("path", &self.path),
585 ("read", &self.read.to_string()),
586 ("size", &bs.len().to_string()),
587 ],
588 if bs.is_empty() {
589 "finished"
590 } else {
591 "succeeded"
592 },
593 None,
594 );
595 Ok(bs)
596 }
597 Err(err) => {
598 self.logger.log(
599 &self.info,
600 Operation::Read,
601 &[("path", &self.path), ("read", &self.read.to_string())],
602 "failed",
603 Some(&err),
604 );
605 Err(err)
606 }
607 }
608 }
609}
610
611pub struct LoggingWriter<W, I> {
612 info: Arc<AccessorInfo>,
613 logger: I,
614 path: String,
615
616 written: u64,
617 inner: W,
618}
619
620impl<W, I> LoggingWriter<W, I> {
621 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
622 Self {
623 info,
624 logger,
625 path: path.to_string(),
626
627 written: 0,
628 inner: writer,
629 }
630 }
631}
632
633impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
634 async fn write(&mut self, bs: Buffer) -> Result<()> {
635 let size = bs.len();
636
637 self.logger.log(
638 &self.info,
639 Operation::Write,
640 &[
641 ("path", &self.path),
642 ("written", &self.written.to_string()),
643 ("size", &size.to_string()),
644 ],
645 "started",
646 None,
647 );
648
649 match self.inner.write(bs).await {
650 Ok(_) => {
651 self.written += size as u64;
652 self.logger.log(
653 &self.info,
654 Operation::Write,
655 &[
656 ("path", &self.path),
657 ("written", &self.written.to_string()),
658 ("size", &size.to_string()),
659 ],
660 "succeeded",
661 None,
662 );
663 Ok(())
664 }
665 Err(err) => {
666 self.logger.log(
667 &self.info,
668 Operation::Write,
669 &[
670 ("path", &self.path),
671 ("written", &self.written.to_string()),
672 ("size", &size.to_string()),
673 ],
674 "failed",
675 Some(&err),
676 );
677 Err(err)
678 }
679 }
680 }
681
682 async fn abort(&mut self) -> Result<()> {
683 self.logger.log(
684 &self.info,
685 Operation::Write,
686 &[("path", &self.path), ("written", &self.written.to_string())],
687 "started",
688 None,
689 );
690
691 match self.inner.abort().await {
692 Ok(_) => {
693 self.logger.log(
694 &self.info,
695 Operation::Write,
696 &[("path", &self.path), ("written", &self.written.to_string())],
697 "succeeded",
698 None,
699 );
700 Ok(())
701 }
702 Err(err) => {
703 self.logger.log(
704 &self.info,
705 Operation::Write,
706 &[("path", &self.path), ("written", &self.written.to_string())],
707 "failed",
708 Some(&err),
709 );
710 Err(err)
711 }
712 }
713 }
714
715 async fn close(&mut self) -> Result<Metadata> {
716 self.logger.log(
717 &self.info,
718 Operation::Write,
719 &[("path", &self.path), ("written", &self.written.to_string())],
720 "started",
721 None,
722 );
723
724 match self.inner.close().await {
725 Ok(meta) => {
726 self.logger.log(
727 &self.info,
728 Operation::Write,
729 &[("path", &self.path), ("written", &self.written.to_string())],
730 "succeeded",
731 None,
732 );
733 Ok(meta)
734 }
735 Err(err) => {
736 self.logger.log(
737 &self.info,
738 Operation::Write,
739 &[("path", &self.path), ("written", &self.written.to_string())],
740 "failed",
741 Some(&err),
742 );
743 Err(err)
744 }
745 }
746 }
747}
748
749pub struct LoggingLister<P, I: LoggingInterceptor> {
750 info: Arc<AccessorInfo>,
751 logger: I,
752 path: String,
753
754 listed: usize,
755 inner: P,
756}
757
758impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
759 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
760 Self {
761 info,
762 logger,
763 path: path.to_string(),
764
765 listed: 0,
766 inner,
767 }
768 }
769}
770
771impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
772 async fn next(&mut self) -> Result<Option<oio::Entry>> {
773 self.logger.log(
774 &self.info,
775 Operation::List,
776 &[("path", &self.path), ("listed", &self.listed.to_string())],
777 "started",
778 None,
779 );
780
781 let res = self.inner.next().await;
782
783 match &res {
784 Ok(Some(de)) => {
785 self.listed += 1;
786 self.logger.log(
787 &self.info,
788 Operation::List,
789 &[
790 ("path", &self.path),
791 ("listed", &self.listed.to_string()),
792 ("entry", de.path()),
793 ],
794 "succeeded",
795 None,
796 );
797 }
798 Ok(None) => {
799 self.logger.log(
800 &self.info,
801 Operation::List,
802 &[("path", &self.path), ("listed", &self.listed.to_string())],
803 "finished",
804 None,
805 );
806 }
807 Err(err) => {
808 self.logger.log(
809 &self.info,
810 Operation::List,
811 &[("path", &self.path), ("listed", &self.listed.to_string())],
812 "failed",
813 Some(err),
814 );
815 }
816 };
817
818 res
819 }
820}
821
822pub struct LoggingDeleter<D, I: LoggingInterceptor> {
823 info: Arc<AccessorInfo>,
824 logger: I,
825
826 queued: usize,
827 deleted: usize,
828 inner: D,
829}
830
831impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
832 fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
833 Self {
834 info,
835 logger,
836
837 queued: 0,
838 deleted: 0,
839 inner,
840 }
841 }
842}
843
844impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
845 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
846 let version = args
847 .version()
848 .map(|v| v.to_string())
849 .unwrap_or_else(|| "<latest>".to_string());
850
851 self.logger.log(
852 &self.info,
853 Operation::Delete,
854 &[("path", path), ("version", &version)],
855 "started",
856 None,
857 );
858
859 let res = self.inner.delete(path, args);
860
861 match &res {
862 Ok(_) => {
863 self.queued += 1;
864 self.logger.log(
865 &self.info,
866 Operation::Delete,
867 &[
868 ("path", path),
869 ("version", &version),
870 ("queued", &self.queued.to_string()),
871 ("deleted", &self.deleted.to_string()),
872 ],
873 "succeeded",
874 None,
875 );
876 }
877 Err(err) => {
878 self.logger.log(
879 &self.info,
880 Operation::Delete,
881 &[
882 ("path", path),
883 ("version", &version),
884 ("queued", &self.queued.to_string()),
885 ("deleted", &self.deleted.to_string()),
886 ],
887 "failed",
888 Some(err),
889 );
890 }
891 };
892
893 res
894 }
895
896 async fn flush(&mut self) -> Result<usize> {
897 self.logger.log(
898 &self.info,
899 Operation::Delete,
900 &[
901 ("queued", &self.queued.to_string()),
902 ("deleted", &self.deleted.to_string()),
903 ],
904 "started",
905 None,
906 );
907
908 let res = self.inner.flush().await;
909
910 match &res {
911 Ok(flushed) => {
912 self.queued -= flushed;
913 self.deleted += flushed;
914 self.logger.log(
915 &self.info,
916 Operation::Delete,
917 &[
918 ("queued", &self.queued.to_string()),
919 ("deleted", &self.deleted.to_string()),
920 ],
921 "succeeded",
922 None,
923 );
924 }
925 Err(err) => {
926 self.logger.log(
927 &self.info,
928 Operation::Delete,
929 &[
930 ("queued", &self.queued.to_string()),
931 ("deleted", &self.deleted.to_string()),
932 ],
933 "failed",
934 Some(err),
935 );
936 }
937 };
938
939 res
940 }
941}