opendal/layers/
logging.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::log;
23use log::Level;
24
25use crate::raw::*;
26use crate::*;
27
28/// Add [log](https://docs.rs/log/) for every operation.
29///
30/// # Logging
31///
32/// - OpenDAL will log in structural way.
33/// - Every operation will start with a `started` log entry.
34/// - Every operation will finish with the following status:
35///   - `succeeded`: the operation is successful, but might have more to take.
36///   - `finished`: the whole operation is finished.
37///   - `failed`: the operation returns an unexpected error.
38/// - The default log level while expected error happened is `Warn`.
39/// - The default log level while unexpected failure happened is `Error`.
40///
41/// # Examples
42///
43/// ```no_run
44/// # use opendal::layers::LoggingLayer;
45/// # use opendal::services;
46/// # use opendal::Operator;
47/// # use opendal::Result;
48/// # use opendal::Scheme;
49///
50/// # fn main() -> Result<()> {
51/// let _ = Operator::new(services::Memory::default())?
52///     .layer(LoggingLayer::default())
53///     .finish();
54/// Ok(())
55/// # }
56/// ```
57///
58/// # Output
59///
60/// OpenDAL is using [`log`](https://docs.rs/log/latest/log/) for logging internally.
61///
62/// To enable logging output, please set `RUST_LOG`:
63///
64/// ```shell
65/// RUST_LOG=debug ./app
66/// ```
67///
68/// To config logging output, please refer to [Configure Logging](https://rust-lang-nursery.github.io/rust-cookbook/development_tools/debugging/config_log.html):
69///
70/// ```shell
71/// RUST_LOG="info,opendal::services=debug" ./app
72/// ```
73///
74/// # Logging Interceptor
75///
76/// You can implement your own logging interceptor to customize the logging behavior.
77///
78/// ```no_run
79/// # use opendal::layers::LoggingInterceptor;
80/// # use opendal::layers::LoggingLayer;
81/// # use opendal::raw;
82/// # use opendal::services;
83/// # use opendal::Error;
84/// # use opendal::Operator;
85/// # use opendal::Result;
86/// # use opendal::Scheme;
87///
88/// #[derive(Debug, Clone)]
89/// struct MyLoggingInterceptor;
90///
91/// impl LoggingInterceptor for MyLoggingInterceptor {
92///     fn log(
93///         &self,
94///         info: &raw::AccessorInfo,
95///         operation: raw::Operation,
96///         context: &[(&str, &str)],
97///         message: &str,
98///         err: Option<&Error>,
99///     ) {
100///         // log something
101///     }
102/// }
103///
104/// # fn main() -> Result<()> {
105/// let _ = Operator::new(services::Memory::default())?
106///     .layer(LoggingLayer::new(MyLoggingInterceptor))
107///     .finish();
108/// Ok(())
109/// # }
110/// ```
111#[derive(Debug)]
112pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
113    logger: I,
114}
115
116impl Default for LoggingLayer {
117    fn default() -> Self {
118        Self {
119            logger: DefaultLoggingInterceptor,
120        }
121    }
122}
123
124impl LoggingLayer {
125    /// Create the layer with specific logging interceptor.
126    pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
127        LoggingLayer { logger }
128    }
129}
130
131impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
132    type LayeredAccess = LoggingAccessor<A, I>;
133
134    fn layer(&self, inner: A) -> Self::LayeredAccess {
135        let info = inner.info();
136        LoggingAccessor {
137            inner,
138
139            info,
140            logger: self.logger.clone(),
141        }
142    }
143}
144
145/// LoggingInterceptor is used to intercept the log.
146pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
147    /// Everytime there is a log, this function will be called.
148    ///
149    /// # Inputs
150    ///
151    /// - info: The service's access info.
152    /// - operation: The operation to log.
153    /// - context: Additional context of the log like path, etc.
154    /// - message: The log message.
155    /// - err: The error to log.
156    ///
157    /// # Note
158    ///
159    /// Users should avoid calling resource-intensive operations such as I/O or network
160    /// functions here, especially anything that takes longer than 10ms. Otherwise, Opendal
161    /// could perform unexpectedly slow.
162    fn log(
163        &self,
164        info: &AccessorInfo,
165        operation: Operation,
166        context: &[(&str, &str)],
167        message: &str,
168        err: Option<&Error>,
169    );
170}
171
172/// The DefaultLoggingInterceptor will log the message by the standard logging macro.
173#[derive(Debug, Copy, Clone, Default)]
174pub struct DefaultLoggingInterceptor;
175
176impl LoggingInterceptor for DefaultLoggingInterceptor {
177    #[inline]
178    fn log(
179        &self,
180        info: &AccessorInfo,
181        operation: Operation,
182        context: &[(&str, &str)],
183        message: &str,
184        err: Option<&Error>,
185    ) {
186        if let Some(err) = err {
187            // Print error if it's unexpected, otherwise in warn.
188            let lvl = if err.kind() == ErrorKind::Unexpected {
189                Level::Error
190            } else {
191                Level::Warn
192            };
193
194            log!(
195                target: LOGGING_TARGET,
196                lvl,
197                "service={} name={}{}: {operation} {message} {}",
198                info.scheme(),
199                info.name(),
200                LoggingContext(context),
201                // Print error message with debug output while unexpected happened.
202                //
203                // It's super sad that we can't bind `format_args!()` here.
204                // See: https://github.com/rust-lang/rust/issues/92698
205                if err.kind() != ErrorKind::Unexpected {
206                   format!("{err}")
207                } else {
208                   format!("{err:?}")
209                }
210            );
211        }
212
213        log!(
214            target: LOGGING_TARGET,
215            Level::Debug,
216            "service={} name={}{}: {operation} {message}",
217            info.scheme(),
218            info.name(),
219            LoggingContext(context),
220        );
221    }
222}
223
224struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
225
226impl Display for LoggingContext<'_> {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        for (k, v) in self.0.iter() {
229            write!(f, " {}={}", k, v)?;
230        }
231        Ok(())
232    }
233}
234
235#[derive(Clone, Debug)]
236pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
237    inner: A,
238
239    info: Arc<AccessorInfo>,
240    logger: I,
241}
242
243static LOGGING_TARGET: &str = "opendal::services";
244
245impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
246    type Inner = A;
247    type Reader = LoggingReader<A::Reader, I>;
248    type Writer = LoggingWriter<A::Writer, I>;
249    type Lister = LoggingLister<A::Lister, I>;
250    type Deleter = LoggingDeleter<A::Deleter, I>;
251
252    fn inner(&self) -> &Self::Inner {
253        &self.inner
254    }
255
256    fn info(&self) -> Arc<AccessorInfo> {
257        self.info.clone()
258    }
259
260    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
261        self.logger.log(
262            &self.info,
263            Operation::CreateDir,
264            &[("path", path)],
265            "started",
266            None,
267        );
268
269        self.inner
270            .create_dir(path, args)
271            .await
272            .inspect(|_| {
273                self.logger.log(
274                    &self.info,
275                    Operation::CreateDir,
276                    &[("path", path)],
277                    "finished",
278                    None,
279                );
280            })
281            .inspect_err(|err| {
282                self.logger.log(
283                    &self.info,
284                    Operation::CreateDir,
285                    &[("path", path)],
286                    "failed",
287                    Some(err),
288                );
289            })
290    }
291
292    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
293        self.logger.log(
294            &self.info,
295            Operation::Read,
296            &[("path", path)],
297            "started",
298            None,
299        );
300
301        self.inner
302            .read(path, args)
303            .await
304            .map(|(rp, r)| {
305                self.logger.log(
306                    &self.info,
307                    Operation::Read,
308                    &[("path", path)],
309                    "created reader",
310                    None,
311                );
312                (
313                    rp,
314                    LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
315                )
316            })
317            .inspect_err(|err| {
318                self.logger.log(
319                    &self.info,
320                    Operation::Read,
321                    &[("path", path)],
322                    "failed",
323                    Some(err),
324                );
325            })
326    }
327
328    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
329        self.logger.log(
330            &self.info,
331            Operation::Write,
332            &[("path", path)],
333            "started",
334            None,
335        );
336
337        self.inner
338            .write(path, args)
339            .await
340            .map(|(rp, w)| {
341                self.logger.log(
342                    &self.info,
343                    Operation::Write,
344                    &[("path", path)],
345                    "created writer",
346                    None,
347                );
348                let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
349                (rp, w)
350            })
351            .inspect_err(|err| {
352                self.logger.log(
353                    &self.info,
354                    Operation::Write,
355                    &[("path", path)],
356                    "failed",
357                    Some(err),
358                );
359            })
360    }
361
362    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
363        self.logger.log(
364            &self.info,
365            Operation::Copy,
366            &[("from", from), ("to", to)],
367            "started",
368            None,
369        );
370
371        self.inner
372            .copy(from, to, args)
373            .await
374            .inspect(|_| {
375                self.logger.log(
376                    &self.info,
377                    Operation::Copy,
378                    &[("from", from), ("to", to)],
379                    "finished",
380                    None,
381                );
382            })
383            .inspect_err(|err| {
384                self.logger.log(
385                    &self.info,
386                    Operation::Copy,
387                    &[("from", from), ("to", to)],
388                    "failed",
389                    Some(err),
390                );
391            })
392    }
393
394    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
395        self.logger.log(
396            &self.info,
397            Operation::Rename,
398            &[("from", from), ("to", to)],
399            "started",
400            None,
401        );
402
403        self.inner
404            .rename(from, to, args)
405            .await
406            .inspect(|_| {
407                self.logger.log(
408                    &self.info,
409                    Operation::Rename,
410                    &[("from", from), ("to", to)],
411                    "finished",
412                    None,
413                );
414            })
415            .inspect_err(|err| {
416                self.logger.log(
417                    &self.info,
418                    Operation::Rename,
419                    &[("from", from), ("to", to)],
420                    "failed",
421                    Some(err),
422                );
423            })
424    }
425
426    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
427        self.logger.log(
428            &self.info,
429            Operation::Stat,
430            &[("path", path)],
431            "started",
432            None,
433        );
434
435        self.inner
436            .stat(path, args)
437            .await
438            .inspect(|_| {
439                self.logger.log(
440                    &self.info,
441                    Operation::Stat,
442                    &[("path", path)],
443                    "finished",
444                    None,
445                );
446            })
447            .inspect_err(|err| {
448                self.logger.log(
449                    &self.info,
450                    Operation::Stat,
451                    &[("path", path)],
452                    "failed",
453                    Some(err),
454                );
455            })
456    }
457
458    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
459        self.logger
460            .log(&self.info, Operation::Delete, &[], "started", None);
461
462        self.inner
463            .delete()
464            .await
465            .map(|(rp, d)| {
466                self.logger
467                    .log(&self.info, Operation::Delete, &[], "finished", None);
468                let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
469                (rp, d)
470            })
471            .inspect_err(|err| {
472                self.logger
473                    .log(&self.info, Operation::Delete, &[], "failed", Some(err));
474            })
475    }
476
477    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
478        self.logger.log(
479            &self.info,
480            Operation::List,
481            &[("path", path)],
482            "started",
483            None,
484        );
485
486        self.inner
487            .list(path, args)
488            .await
489            .map(|(rp, v)| {
490                self.logger.log(
491                    &self.info,
492                    Operation::List,
493                    &[("path", path)],
494                    "created lister",
495                    None,
496                );
497                let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
498                (rp, streamer)
499            })
500            .inspect_err(|err| {
501                self.logger.log(
502                    &self.info,
503                    Operation::List,
504                    &[("path", path)],
505                    "failed",
506                    Some(err),
507                );
508            })
509    }
510
511    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
512        self.logger.log(
513            &self.info,
514            Operation::Presign,
515            &[("path", path)],
516            "started",
517            None,
518        );
519
520        self.inner
521            .presign(path, args)
522            .await
523            .inspect(|_| {
524                self.logger.log(
525                    &self.info,
526                    Operation::Presign,
527                    &[("path", path)],
528                    "finished",
529                    None,
530                );
531            })
532            .inspect_err(|err| {
533                self.logger.log(
534                    &self.info,
535                    Operation::Presign,
536                    &[("path", path)],
537                    "failed",
538                    Some(err),
539                );
540            })
541    }
542}
543
544/// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality.
545pub struct LoggingReader<R, I: LoggingInterceptor> {
546    info: Arc<AccessorInfo>,
547    logger: I,
548    path: String,
549
550    read: u64,
551    inner: R,
552}
553
554impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
555    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
556        Self {
557            info,
558            logger,
559            path: path.to_string(),
560
561            read: 0,
562            inner: reader,
563        }
564    }
565}
566
567impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
568    async fn read(&mut self) -> Result<Buffer> {
569        self.logger.log(
570            &self.info,
571            Operation::Read,
572            &[("path", &self.path), ("read", &self.read.to_string())],
573            "started",
574            None,
575        );
576
577        match self.inner.read().await {
578            Ok(bs) => {
579                self.read += bs.len() as u64;
580                self.logger.log(
581                    &self.info,
582                    Operation::Read,
583                    &[
584                        ("path", &self.path),
585                        ("read", &self.read.to_string()),
586                        ("size", &bs.len().to_string()),
587                    ],
588                    if bs.is_empty() {
589                        "finished"
590                    } else {
591                        "succeeded"
592                    },
593                    None,
594                );
595                Ok(bs)
596            }
597            Err(err) => {
598                self.logger.log(
599                    &self.info,
600                    Operation::Read,
601                    &[("path", &self.path), ("read", &self.read.to_string())],
602                    "failed",
603                    Some(&err),
604                );
605                Err(err)
606            }
607        }
608    }
609}
610
611pub struct LoggingWriter<W, I> {
612    info: Arc<AccessorInfo>,
613    logger: I,
614    path: String,
615
616    written: u64,
617    inner: W,
618}
619
620impl<W, I> LoggingWriter<W, I> {
621    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
622        Self {
623            info,
624            logger,
625            path: path.to_string(),
626
627            written: 0,
628            inner: writer,
629        }
630    }
631}
632
633impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
634    async fn write(&mut self, bs: Buffer) -> Result<()> {
635        let size = bs.len();
636
637        self.logger.log(
638            &self.info,
639            Operation::Write,
640            &[
641                ("path", &self.path),
642                ("written", &self.written.to_string()),
643                ("size", &size.to_string()),
644            ],
645            "started",
646            None,
647        );
648
649        match self.inner.write(bs).await {
650            Ok(_) => {
651                self.written += size as u64;
652                self.logger.log(
653                    &self.info,
654                    Operation::Write,
655                    &[
656                        ("path", &self.path),
657                        ("written", &self.written.to_string()),
658                        ("size", &size.to_string()),
659                    ],
660                    "succeeded",
661                    None,
662                );
663                Ok(())
664            }
665            Err(err) => {
666                self.logger.log(
667                    &self.info,
668                    Operation::Write,
669                    &[
670                        ("path", &self.path),
671                        ("written", &self.written.to_string()),
672                        ("size", &size.to_string()),
673                    ],
674                    "failed",
675                    Some(&err),
676                );
677                Err(err)
678            }
679        }
680    }
681
682    async fn abort(&mut self) -> Result<()> {
683        self.logger.log(
684            &self.info,
685            Operation::Write,
686            &[("path", &self.path), ("written", &self.written.to_string())],
687            "started",
688            None,
689        );
690
691        match self.inner.abort().await {
692            Ok(_) => {
693                self.logger.log(
694                    &self.info,
695                    Operation::Write,
696                    &[("path", &self.path), ("written", &self.written.to_string())],
697                    "succeeded",
698                    None,
699                );
700                Ok(())
701            }
702            Err(err) => {
703                self.logger.log(
704                    &self.info,
705                    Operation::Write,
706                    &[("path", &self.path), ("written", &self.written.to_string())],
707                    "failed",
708                    Some(&err),
709                );
710                Err(err)
711            }
712        }
713    }
714
715    async fn close(&mut self) -> Result<Metadata> {
716        self.logger.log(
717            &self.info,
718            Operation::Write,
719            &[("path", &self.path), ("written", &self.written.to_string())],
720            "started",
721            None,
722        );
723
724        match self.inner.close().await {
725            Ok(meta) => {
726                self.logger.log(
727                    &self.info,
728                    Operation::Write,
729                    &[("path", &self.path), ("written", &self.written.to_string())],
730                    "succeeded",
731                    None,
732                );
733                Ok(meta)
734            }
735            Err(err) => {
736                self.logger.log(
737                    &self.info,
738                    Operation::Write,
739                    &[("path", &self.path), ("written", &self.written.to_string())],
740                    "failed",
741                    Some(&err),
742                );
743                Err(err)
744            }
745        }
746    }
747}
748
749pub struct LoggingLister<P, I: LoggingInterceptor> {
750    info: Arc<AccessorInfo>,
751    logger: I,
752    path: String,
753
754    listed: usize,
755    inner: P,
756}
757
758impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
759    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
760        Self {
761            info,
762            logger,
763            path: path.to_string(),
764
765            listed: 0,
766            inner,
767        }
768    }
769}
770
771impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
772    async fn next(&mut self) -> Result<Option<oio::Entry>> {
773        self.logger.log(
774            &self.info,
775            Operation::List,
776            &[("path", &self.path), ("listed", &self.listed.to_string())],
777            "started",
778            None,
779        );
780
781        let res = self.inner.next().await;
782
783        match &res {
784            Ok(Some(de)) => {
785                self.listed += 1;
786                self.logger.log(
787                    &self.info,
788                    Operation::List,
789                    &[
790                        ("path", &self.path),
791                        ("listed", &self.listed.to_string()),
792                        ("entry", de.path()),
793                    ],
794                    "succeeded",
795                    None,
796                );
797            }
798            Ok(None) => {
799                self.logger.log(
800                    &self.info,
801                    Operation::List,
802                    &[("path", &self.path), ("listed", &self.listed.to_string())],
803                    "finished",
804                    None,
805                );
806            }
807            Err(err) => {
808                self.logger.log(
809                    &self.info,
810                    Operation::List,
811                    &[("path", &self.path), ("listed", &self.listed.to_string())],
812                    "failed",
813                    Some(err),
814                );
815            }
816        };
817
818        res
819    }
820}
821
822pub struct LoggingDeleter<D, I: LoggingInterceptor> {
823    info: Arc<AccessorInfo>,
824    logger: I,
825
826    queued: usize,
827    deleted: usize,
828    inner: D,
829}
830
831impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
832    fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
833        Self {
834            info,
835            logger,
836
837            queued: 0,
838            deleted: 0,
839            inner,
840        }
841    }
842}
843
844impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
845    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
846        let version = args
847            .version()
848            .map(|v| v.to_string())
849            .unwrap_or_else(|| "<latest>".to_string());
850
851        self.logger.log(
852            &self.info,
853            Operation::Delete,
854            &[("path", path), ("version", &version)],
855            "started",
856            None,
857        );
858
859        let res = self.inner.delete(path, args);
860
861        match &res {
862            Ok(_) => {
863                self.queued += 1;
864                self.logger.log(
865                    &self.info,
866                    Operation::Delete,
867                    &[
868                        ("path", path),
869                        ("version", &version),
870                        ("queued", &self.queued.to_string()),
871                        ("deleted", &self.deleted.to_string()),
872                    ],
873                    "succeeded",
874                    None,
875                );
876            }
877            Err(err) => {
878                self.logger.log(
879                    &self.info,
880                    Operation::Delete,
881                    &[
882                        ("path", path),
883                        ("version", &version),
884                        ("queued", &self.queued.to_string()),
885                        ("deleted", &self.deleted.to_string()),
886                    ],
887                    "failed",
888                    Some(err),
889                );
890            }
891        };
892
893        res
894    }
895
896    async fn flush(&mut self) -> Result<usize> {
897        self.logger.log(
898            &self.info,
899            Operation::Delete,
900            &[
901                ("queued", &self.queued.to_string()),
902                ("deleted", &self.deleted.to_string()),
903            ],
904            "started",
905            None,
906        );
907
908        let res = self.inner.flush().await;
909
910        match &res {
911            Ok(flushed) => {
912                self.queued -= flushed;
913                self.deleted += flushed;
914                self.logger.log(
915                    &self.info,
916                    Operation::Delete,
917                    &[
918                        ("queued", &self.queued.to_string()),
919                        ("deleted", &self.deleted.to_string()),
920                    ],
921                    "succeeded",
922                    None,
923                );
924            }
925            Err(err) => {
926                self.logger.log(
927                    &self.info,
928                    Operation::Delete,
929                    &[
930                        ("queued", &self.queued.to_string()),
931                        ("deleted", &self.deleted.to_string()),
932                    ],
933                    "failed",
934                    Some(err),
935                );
936            }
937        };
938
939        res
940    }
941}