1use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::log;
23use log::Level;
24
25use crate::raw::*;
26use crate::*;
27
28#[derive(Debug)]
112pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
113 logger: I,
114}
115
116impl Default for LoggingLayer {
117 fn default() -> Self {
118 Self {
119 logger: DefaultLoggingInterceptor,
120 }
121 }
122}
123
124impl LoggingLayer {
125 pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
127 LoggingLayer { logger }
128 }
129}
130
131impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
132 type LayeredAccess = LoggingAccessor<A, I>;
133
134 fn layer(&self, inner: A) -> Self::LayeredAccess {
135 let info = inner.info();
136 LoggingAccessor {
137 inner,
138
139 info,
140 logger: self.logger.clone(),
141 }
142 }
143}
144
145pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
147 fn log(
163 &self,
164 info: &AccessorInfo,
165 operation: Operation,
166 context: &[(&str, &str)],
167 message: &str,
168 err: Option<&Error>,
169 );
170}
171
172#[derive(Debug, Copy, Clone, Default)]
174pub struct DefaultLoggingInterceptor;
175
176impl LoggingInterceptor for DefaultLoggingInterceptor {
177 #[inline]
178 fn log(
179 &self,
180 info: &AccessorInfo,
181 operation: Operation,
182 context: &[(&str, &str)],
183 message: &str,
184 err: Option<&Error>,
185 ) {
186 if let Some(err) = err {
187 let lvl = if err.kind() == ErrorKind::Unexpected {
189 Level::Error
190 } else {
191 Level::Warn
192 };
193
194 log!(
195 target: LOGGING_TARGET,
196 lvl,
197 "service={} name={}{}: {operation} {message} {}",
198 info.scheme(),
199 info.name(),
200 LoggingContext(context),
201 if err.kind() != ErrorKind::Unexpected {
206 format!("{err}")
207 } else {
208 format!("{err:?}")
209 }
210 );
211 }
212
213 log!(
214 target: LOGGING_TARGET,
215 Level::Debug,
216 "service={} name={}{}: {operation} {message}",
217 info.scheme(),
218 info.name(),
219 LoggingContext(context),
220 );
221 }
222}
223
224struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
225
226impl Display for LoggingContext<'_> {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 for (k, v) in self.0.iter() {
229 write!(f, " {k}={v}")?;
230 }
231 Ok(())
232 }
233}
234
235#[derive(Clone, Debug)]
236pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
237 inner: A,
238
239 info: Arc<AccessorInfo>,
240 logger: I,
241}
242
243static LOGGING_TARGET: &str = "opendal::services";
244
245impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
246 type Inner = A;
247 type Reader = LoggingReader<A::Reader, I>;
248 type Writer = LoggingWriter<A::Writer, I>;
249 type Lister = LoggingLister<A::Lister, I>;
250 type Deleter = LoggingDeleter<A::Deleter, I>;
251
252 fn inner(&self) -> &Self::Inner {
253 &self.inner
254 }
255
256 fn info(&self) -> Arc<AccessorInfo> {
257 self.info.clone()
258 }
259
260 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
261 self.logger.log(
262 &self.info,
263 Operation::CreateDir,
264 &[("path", path)],
265 "started",
266 None,
267 );
268
269 self.inner
270 .create_dir(path, args)
271 .await
272 .inspect(|_| {
273 self.logger.log(
274 &self.info,
275 Operation::CreateDir,
276 &[("path", path)],
277 "finished",
278 None,
279 );
280 })
281 .inspect_err(|err| {
282 self.logger.log(
283 &self.info,
284 Operation::CreateDir,
285 &[("path", path)],
286 "failed",
287 Some(err),
288 );
289 })
290 }
291
292 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
293 self.logger.log(
294 &self.info,
295 Operation::Read,
296 &[("path", path)],
297 "started",
298 None,
299 );
300
301 self.inner
302 .read(path, args)
303 .await
304 .map(|(rp, r)| {
305 self.logger.log(
306 &self.info,
307 Operation::Read,
308 &[("path", path)],
309 "created reader",
310 None,
311 );
312 (
313 rp,
314 LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
315 )
316 })
317 .inspect_err(|err| {
318 self.logger.log(
319 &self.info,
320 Operation::Read,
321 &[("path", path)],
322 "failed",
323 Some(err),
324 );
325 })
326 }
327
328 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
329 self.logger.log(
330 &self.info,
331 Operation::Write,
332 &[("path", path)],
333 "started",
334 None,
335 );
336
337 self.inner
338 .write(path, args)
339 .await
340 .map(|(rp, w)| {
341 self.logger.log(
342 &self.info,
343 Operation::Write,
344 &[("path", path)],
345 "created writer",
346 None,
347 );
348 let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
349 (rp, w)
350 })
351 .inspect_err(|err| {
352 self.logger.log(
353 &self.info,
354 Operation::Write,
355 &[("path", path)],
356 "failed",
357 Some(err),
358 );
359 })
360 }
361
362 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
363 self.logger.log(
364 &self.info,
365 Operation::Copy,
366 &[("from", from), ("to", to)],
367 "started",
368 None,
369 );
370
371 self.inner
372 .copy(from, to, args)
373 .await
374 .inspect(|_| {
375 self.logger.log(
376 &self.info,
377 Operation::Copy,
378 &[("from", from), ("to", to)],
379 "finished",
380 None,
381 );
382 })
383 .inspect_err(|err| {
384 self.logger.log(
385 &self.info,
386 Operation::Copy,
387 &[("from", from), ("to", to)],
388 "failed",
389 Some(err),
390 );
391 })
392 }
393
394 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
395 self.logger.log(
396 &self.info,
397 Operation::Rename,
398 &[("from", from), ("to", to)],
399 "started",
400 None,
401 );
402
403 self.inner
404 .rename(from, to, args)
405 .await
406 .inspect(|_| {
407 self.logger.log(
408 &self.info,
409 Operation::Rename,
410 &[("from", from), ("to", to)],
411 "finished",
412 None,
413 );
414 })
415 .inspect_err(|err| {
416 self.logger.log(
417 &self.info,
418 Operation::Rename,
419 &[("from", from), ("to", to)],
420 "failed",
421 Some(err),
422 );
423 })
424 }
425
426 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
427 self.logger.log(
428 &self.info,
429 Operation::Stat,
430 &[("path", path)],
431 "started",
432 None,
433 );
434
435 self.inner
436 .stat(path, args)
437 .await
438 .inspect(|_| {
439 self.logger.log(
440 &self.info,
441 Operation::Stat,
442 &[("path", path)],
443 "finished",
444 None,
445 );
446 })
447 .inspect_err(|err| {
448 self.logger.log(
449 &self.info,
450 Operation::Stat,
451 &[("path", path)],
452 "failed",
453 Some(err),
454 );
455 })
456 }
457
458 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
459 self.logger
460 .log(&self.info, Operation::Delete, &[], "started", None);
461
462 self.inner
463 .delete()
464 .await
465 .map(|(rp, d)| {
466 self.logger
467 .log(&self.info, Operation::Delete, &[], "finished", None);
468 let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
469 (rp, d)
470 })
471 .inspect_err(|err| {
472 self.logger
473 .log(&self.info, Operation::Delete, &[], "failed", Some(err));
474 })
475 }
476
477 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
478 self.logger.log(
479 &self.info,
480 Operation::List,
481 &[("path", path)],
482 "started",
483 None,
484 );
485
486 self.inner
487 .list(path, args)
488 .await
489 .map(|(rp, v)| {
490 self.logger.log(
491 &self.info,
492 Operation::List,
493 &[("path", path)],
494 "created lister",
495 None,
496 );
497 let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
498 (rp, streamer)
499 })
500 .inspect_err(|err| {
501 self.logger.log(
502 &self.info,
503 Operation::List,
504 &[("path", path)],
505 "failed",
506 Some(err),
507 );
508 })
509 }
510
511 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
512 self.logger.log(
513 &self.info,
514 Operation::Presign,
515 &[("path", path)],
516 "started",
517 None,
518 );
519
520 self.inner
521 .presign(path, args)
522 .await
523 .inspect(|_| {
524 self.logger.log(
525 &self.info,
526 Operation::Presign,
527 &[("path", path)],
528 "finished",
529 None,
530 );
531 })
532 .inspect_err(|err| {
533 self.logger.log(
534 &self.info,
535 Operation::Presign,
536 &[("path", path)],
537 "failed",
538 Some(err),
539 );
540 })
541 }
542}
543
544pub struct LoggingReader<R, I: LoggingInterceptor> {
545 info: Arc<AccessorInfo>,
546 logger: I,
547 path: String,
548
549 read: u64,
550 inner: R,
551}
552
553impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
554 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
555 Self {
556 info,
557 logger,
558 path: path.to_string(),
559
560 read: 0,
561 inner: reader,
562 }
563 }
564}
565
566impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
567 async fn read(&mut self) -> Result<Buffer> {
568 match self.inner.read().await {
569 Ok(bs) if bs.is_empty() => {
570 self.logger.log(
571 &self.info,
572 Operation::Read,
573 &[
574 ("path", &self.path),
575 ("read", &self.read.to_string()),
576 ("size", &bs.len().to_string()),
577 ],
578 "finished",
579 None,
580 );
581 Ok(bs)
582 }
583 Ok(bs) => {
584 self.read += bs.len() as u64;
585 Ok(bs)
586 }
587 Err(err) => {
588 self.logger.log(
589 &self.info,
590 Operation::Read,
591 &[("path", &self.path), ("read", &self.read.to_string())],
592 "failed",
593 Some(&err),
594 );
595 Err(err)
596 }
597 }
598 }
599}
600
601pub struct LoggingWriter<W, I> {
602 info: Arc<AccessorInfo>,
603 logger: I,
604 path: String,
605
606 written: u64,
607 inner: W,
608}
609
610impl<W, I> LoggingWriter<W, I> {
611 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
612 Self {
613 info,
614 logger,
615 path: path.to_string(),
616
617 written: 0,
618 inner: writer,
619 }
620 }
621}
622
623impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
624 async fn write(&mut self, bs: Buffer) -> Result<()> {
625 let size = bs.len();
626
627 match self.inner.write(bs).await {
628 Ok(_) => {
629 self.written += size as u64;
630 Ok(())
631 }
632 Err(err) => {
633 self.logger.log(
634 &self.info,
635 Operation::Write,
636 &[
637 ("path", &self.path),
638 ("written", &self.written.to_string()),
639 ("size", &size.to_string()),
640 ],
641 "failed",
642 Some(&err),
643 );
644 Err(err)
645 }
646 }
647 }
648
649 async fn abort(&mut self) -> Result<()> {
650 match self.inner.abort().await {
651 Ok(_) => {
652 self.logger.log(
653 &self.info,
654 Operation::Write,
655 &[("path", &self.path), ("written", &self.written.to_string())],
656 "abort succeeded",
657 None,
658 );
659 Ok(())
660 }
661 Err(err) => {
662 self.logger.log(
663 &self.info,
664 Operation::Write,
665 &[("path", &self.path), ("written", &self.written.to_string())],
666 "abort failed",
667 Some(&err),
668 );
669 Err(err)
670 }
671 }
672 }
673
674 async fn close(&mut self) -> Result<Metadata> {
675 match self.inner.close().await {
676 Ok(meta) => {
677 self.logger.log(
678 &self.info,
679 Operation::Write,
680 &[("path", &self.path), ("written", &self.written.to_string())],
681 "close succeeded",
682 None,
683 );
684 Ok(meta)
685 }
686 Err(err) => {
687 self.logger.log(
688 &self.info,
689 Operation::Write,
690 &[("path", &self.path), ("written", &self.written.to_string())],
691 "close failed",
692 Some(&err),
693 );
694 Err(err)
695 }
696 }
697 }
698}
699
700pub struct LoggingLister<P, I: LoggingInterceptor> {
701 info: Arc<AccessorInfo>,
702 logger: I,
703 path: String,
704
705 listed: usize,
706 inner: P,
707}
708
709impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
710 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
711 Self {
712 info,
713 logger,
714 path: path.to_string(),
715
716 listed: 0,
717 inner,
718 }
719 }
720}
721
722impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
723 async fn next(&mut self) -> Result<Option<oio::Entry>> {
724 let res = self.inner.next().await;
725
726 match &res {
727 Ok(Some(_)) => {
728 self.listed += 1;
729 }
730 Ok(None) => {
731 self.logger.log(
732 &self.info,
733 Operation::List,
734 &[("path", &self.path), ("listed", &self.listed.to_string())],
735 "finished",
736 None,
737 );
738 }
739 Err(err) => {
740 self.logger.log(
741 &self.info,
742 Operation::List,
743 &[("path", &self.path), ("listed", &self.listed.to_string())],
744 "failed",
745 Some(err),
746 );
747 }
748 };
749
750 res
751 }
752}
753
754pub struct LoggingDeleter<D, I: LoggingInterceptor> {
755 info: Arc<AccessorInfo>,
756 logger: I,
757
758 queued: usize,
759 deleted: usize,
760 inner: D,
761}
762
763impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
764 fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
765 Self {
766 info,
767 logger,
768
769 queued: 0,
770 deleted: 0,
771 inner,
772 }
773 }
774}
775
776impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
777 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
778 let version = args
779 .version()
780 .map(|v| v.to_string())
781 .unwrap_or_else(|| "<latest>".to_string());
782
783 let res = self.inner.delete(path, args);
784
785 match &res {
786 Ok(_) => {
787 self.queued += 1;
788 }
789 Err(err) => {
790 self.logger.log(
791 &self.info,
792 Operation::Delete,
793 &[
794 ("path", path),
795 ("version", &version),
796 ("queued", &self.queued.to_string()),
797 ("deleted", &self.deleted.to_string()),
798 ],
799 "failed",
800 Some(err),
801 );
802 }
803 };
804
805 res
806 }
807
808 async fn flush(&mut self) -> Result<usize> {
809 let res = self.inner.flush().await;
810
811 match &res {
812 Ok(flushed) => {
813 self.queued -= flushed;
814 self.deleted += flushed;
815 self.logger.log(
816 &self.info,
817 Operation::Delete,
818 &[
819 ("queued", &self.queued.to_string()),
820 ("deleted", &self.deleted.to_string()),
821 ],
822 "succeeded",
823 None,
824 );
825 }
826 Err(err) => {
827 self.logger.log(
828 &self.info,
829 Operation::Delete,
830 &[
831 ("queued", &self.queued.to_string()),
832 ("deleted", &self.deleted.to_string()),
833 ],
834 "failed",
835 Some(err),
836 );
837 }
838 };
839
840 res
841 }
842}