opendal/layers/
logging.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::sync::Arc;
21
22use log::log;
23use log::Level;
24
25use crate::raw::*;
26use crate::*;
27
28/// Add [log](https://docs.rs/log/) for every operation.
29///
30/// # Logging
31///
32/// - OpenDAL will log in structural way.
33/// - Every operation will start with a `started` log entry.
34/// - Every operation will finish with the following status:
35///   - `succeeded`: the operation is successful, but might have more to take.
36///   - `finished`: the whole operation is finished.
37///   - `failed`: the operation returns an unexpected error.
38/// - The default log level while expected error happened is `Warn`.
39/// - The default log level while unexpected failure happened is `Error`.
40///
41/// # Examples
42///
43/// ```no_run
44/// # use opendal::layers::LoggingLayer;
45/// # use opendal::services;
46/// # use opendal::Operator;
47/// # use opendal::Result;
48/// # use opendal::Scheme;
49///
50/// # fn main() -> Result<()> {
51/// let _ = Operator::new(services::Memory::default())?
52///     .layer(LoggingLayer::default())
53///     .finish();
54/// Ok(())
55/// # }
56/// ```
57///
58/// # Output
59///
60/// OpenDAL is using [`log`](https://docs.rs/log/latest/log/) for logging internally.
61///
62/// To enable logging output, please set `RUST_LOG`:
63///
64/// ```shell
65/// RUST_LOG=debug ./app
66/// ```
67///
68/// To config logging output, please refer to [Configure Logging](https://rust-lang-nursery.github.io/rust-cookbook/development_tools/debugging/config_log.html):
69///
70/// ```shell
71/// RUST_LOG="info,opendal::services=debug" ./app
72/// ```
73///
74/// # Logging Interceptor
75///
76/// You can implement your own logging interceptor to customize the logging behavior.
77///
78/// ```no_run
79/// # use opendal::layers::LoggingInterceptor;
80/// # use opendal::layers::LoggingLayer;
81/// # use opendal::raw;
82/// # use opendal::services;
83/// # use opendal::Error;
84/// # use opendal::Operator;
85/// # use opendal::Result;
86/// # use opendal::Scheme;
87///
88/// #[derive(Debug, Clone)]
89/// struct MyLoggingInterceptor;
90///
91/// impl LoggingInterceptor for MyLoggingInterceptor {
92///     fn log(
93///         &self,
94///         info: &raw::AccessorInfo,
95///         operation: raw::Operation,
96///         context: &[(&str, &str)],
97///         message: &str,
98///         err: Option<&Error>,
99///     ) {
100///         // log something
101///     }
102/// }
103///
104/// # fn main() -> Result<()> {
105/// let _ = Operator::new(services::Memory::default())?
106///     .layer(LoggingLayer::new(MyLoggingInterceptor))
107///     .finish();
108/// Ok(())
109/// # }
110/// ```
111#[derive(Debug)]
112pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
113    logger: I,
114}
115
116impl Default for LoggingLayer {
117    fn default() -> Self {
118        Self {
119            logger: DefaultLoggingInterceptor,
120        }
121    }
122}
123
124impl LoggingLayer {
125    /// Create the layer with specific logging interceptor.
126    pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
127        LoggingLayer { logger }
128    }
129}
130
131impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
132    type LayeredAccess = LoggingAccessor<A, I>;
133
134    fn layer(&self, inner: A) -> Self::LayeredAccess {
135        let info = inner.info();
136        LoggingAccessor {
137            inner,
138
139            info,
140            logger: self.logger.clone(),
141        }
142    }
143}
144
145/// LoggingInterceptor is used to intercept the log.
146pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
147    /// Everytime there is a log, this function will be called.
148    ///
149    /// # Inputs
150    ///
151    /// - info: The service's access info.
152    /// - operation: The operation to log.
153    /// - context: Additional context of the log like path, etc.
154    /// - message: The log message.
155    /// - err: The error to log.
156    ///
157    /// # Note
158    ///
159    /// Users should avoid calling resource-intensive operations such as I/O or network
160    /// functions here, especially anything that takes longer than 10ms. Otherwise, Opendal
161    /// could perform unexpectedly slow.
162    fn log(
163        &self,
164        info: &AccessorInfo,
165        operation: Operation,
166        context: &[(&str, &str)],
167        message: &str,
168        err: Option<&Error>,
169    );
170}
171
172/// The DefaultLoggingInterceptor will log the message by the standard logging macro.
173#[derive(Debug, Copy, Clone, Default)]
174pub struct DefaultLoggingInterceptor;
175
176impl LoggingInterceptor for DefaultLoggingInterceptor {
177    #[inline]
178    fn log(
179        &self,
180        info: &AccessorInfo,
181        operation: Operation,
182        context: &[(&str, &str)],
183        message: &str,
184        err: Option<&Error>,
185    ) {
186        if let Some(err) = err {
187            // Print error if it's unexpected, otherwise in warn.
188            let lvl = if err.kind() == ErrorKind::Unexpected {
189                Level::Error
190            } else {
191                Level::Warn
192            };
193
194            log!(
195                target: LOGGING_TARGET,
196                lvl,
197                "service={} name={}{}: {operation} {message} {}",
198                info.scheme(),
199                info.name(),
200                LoggingContext(context),
201                // Print error message with debug output while unexpected happened.
202                //
203                // It's super sad that we can't bind `format_args!()` here.
204                // See: https://github.com/rust-lang/rust/issues/92698
205                if err.kind() != ErrorKind::Unexpected {
206                   format!("{err}")
207                } else {
208                   format!("{err:?}")
209                }
210            );
211        }
212
213        log!(
214            target: LOGGING_TARGET,
215            Level::Debug,
216            "service={} name={}{}: {operation} {message}",
217            info.scheme(),
218            info.name(),
219            LoggingContext(context),
220        );
221    }
222}
223
224struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
225
226impl Display for LoggingContext<'_> {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        for (k, v) in self.0.iter() {
229            write!(f, " {}={}", k, v)?;
230        }
231        Ok(())
232    }
233}
234
235#[derive(Clone, Debug)]
236pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
237    inner: A,
238
239    info: Arc<AccessorInfo>,
240    logger: I,
241}
242
243static LOGGING_TARGET: &str = "opendal::services";
244
245impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
246    type Inner = A;
247    type Reader = LoggingReader<A::Reader, I>;
248    type BlockingReader = LoggingReader<A::BlockingReader, I>;
249    type Writer = LoggingWriter<A::Writer, I>;
250    type BlockingWriter = LoggingWriter<A::BlockingWriter, I>;
251    type Lister = LoggingLister<A::Lister, I>;
252    type BlockingLister = LoggingLister<A::BlockingLister, I>;
253    type Deleter = LoggingDeleter<A::Deleter, I>;
254    type BlockingDeleter = LoggingDeleter<A::BlockingDeleter, I>;
255
256    fn inner(&self) -> &Self::Inner {
257        &self.inner
258    }
259
260    fn info(&self) -> Arc<AccessorInfo> {
261        self.info.clone()
262    }
263
264    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
265        self.logger.log(
266            &self.info,
267            Operation::CreateDir,
268            &[("path", path)],
269            "started",
270            None,
271        );
272
273        self.inner
274            .create_dir(path, args)
275            .await
276            .inspect(|_| {
277                self.logger.log(
278                    &self.info,
279                    Operation::CreateDir,
280                    &[("path", path)],
281                    "finished",
282                    None,
283                );
284            })
285            .inspect_err(|err| {
286                self.logger.log(
287                    &self.info,
288                    Operation::CreateDir,
289                    &[("path", path)],
290                    "failed",
291                    Some(err),
292                );
293            })
294    }
295
296    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
297        self.logger.log(
298            &self.info,
299            Operation::Read,
300            &[("path", path)],
301            "started",
302            None,
303        );
304
305        self.inner
306            .read(path, args)
307            .await
308            .map(|(rp, r)| {
309                self.logger.log(
310                    &self.info,
311                    Operation::Read,
312                    &[("path", path)],
313                    "created reader",
314                    None,
315                );
316                (
317                    rp,
318                    LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
319                )
320            })
321            .inspect_err(|err| {
322                self.logger.log(
323                    &self.info,
324                    Operation::Read,
325                    &[("path", path)],
326                    "failed",
327                    Some(err),
328                );
329            })
330    }
331
332    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
333        self.logger.log(
334            &self.info,
335            Operation::Write,
336            &[("path", path)],
337            "started",
338            None,
339        );
340
341        self.inner
342            .write(path, args)
343            .await
344            .map(|(rp, w)| {
345                self.logger.log(
346                    &self.info,
347                    Operation::Write,
348                    &[("path", path)],
349                    "created writer",
350                    None,
351                );
352                let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
353                (rp, w)
354            })
355            .inspect_err(|err| {
356                self.logger.log(
357                    &self.info,
358                    Operation::Write,
359                    &[("path", path)],
360                    "failed",
361                    Some(err),
362                );
363            })
364    }
365
366    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
367        self.logger.log(
368            &self.info,
369            Operation::Copy,
370            &[("from", from), ("to", to)],
371            "started",
372            None,
373        );
374
375        self.inner
376            .copy(from, to, args)
377            .await
378            .inspect(|_| {
379                self.logger.log(
380                    &self.info,
381                    Operation::Copy,
382                    &[("from", from), ("to", to)],
383                    "finished",
384                    None,
385                );
386            })
387            .inspect_err(|err| {
388                self.logger.log(
389                    &self.info,
390                    Operation::Copy,
391                    &[("from", from), ("to", to)],
392                    "failed",
393                    Some(err),
394                );
395            })
396    }
397
398    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
399        self.logger.log(
400            &self.info,
401            Operation::Rename,
402            &[("from", from), ("to", to)],
403            "started",
404            None,
405        );
406
407        self.inner
408            .rename(from, to, args)
409            .await
410            .inspect(|_| {
411                self.logger.log(
412                    &self.info,
413                    Operation::Rename,
414                    &[("from", from), ("to", to)],
415                    "finished",
416                    None,
417                );
418            })
419            .inspect_err(|err| {
420                self.logger.log(
421                    &self.info,
422                    Operation::Rename,
423                    &[("from", from), ("to", to)],
424                    "failed",
425                    Some(err),
426                );
427            })
428    }
429
430    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
431        self.logger.log(
432            &self.info,
433            Operation::Stat,
434            &[("path", path)],
435            "started",
436            None,
437        );
438
439        self.inner
440            .stat(path, args)
441            .await
442            .inspect(|_| {
443                self.logger.log(
444                    &self.info,
445                    Operation::Stat,
446                    &[("path", path)],
447                    "finished",
448                    None,
449                );
450            })
451            .inspect_err(|err| {
452                self.logger.log(
453                    &self.info,
454                    Operation::Stat,
455                    &[("path", path)],
456                    "failed",
457                    Some(err),
458                );
459            })
460    }
461
462    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
463        self.logger
464            .log(&self.info, Operation::Delete, &[], "started", None);
465
466        self.inner
467            .delete()
468            .await
469            .map(|(rp, d)| {
470                self.logger
471                    .log(&self.info, Operation::Delete, &[], "finished", None);
472                let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
473                (rp, d)
474            })
475            .inspect_err(|err| {
476                self.logger
477                    .log(&self.info, Operation::Delete, &[], "failed", Some(err));
478            })
479    }
480
481    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
482        self.logger.log(
483            &self.info,
484            Operation::List,
485            &[("path", path)],
486            "started",
487            None,
488        );
489
490        self.inner
491            .list(path, args)
492            .await
493            .map(|(rp, v)| {
494                self.logger.log(
495                    &self.info,
496                    Operation::List,
497                    &[("path", path)],
498                    "created lister",
499                    None,
500                );
501                let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
502                (rp, streamer)
503            })
504            .inspect_err(|err| {
505                self.logger.log(
506                    &self.info,
507                    Operation::List,
508                    &[("path", path)],
509                    "failed",
510                    Some(err),
511                );
512            })
513    }
514
515    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
516        self.logger.log(
517            &self.info,
518            Operation::Presign,
519            &[("path", path)],
520            "started",
521            None,
522        );
523
524        self.inner
525            .presign(path, args)
526            .await
527            .inspect(|_| {
528                self.logger.log(
529                    &self.info,
530                    Operation::Presign,
531                    &[("path", path)],
532                    "finished",
533                    None,
534                );
535            })
536            .inspect_err(|err| {
537                self.logger.log(
538                    &self.info,
539                    Operation::Presign,
540                    &[("path", path)],
541                    "failed",
542                    Some(err),
543                );
544            })
545    }
546
547    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
548        self.logger.log(
549            &self.info,
550            Operation::CreateDir,
551            &[("path", path)],
552            "started",
553            None,
554        );
555
556        self.inner
557            .blocking_create_dir(path, args)
558            .inspect(|_| {
559                self.logger.log(
560                    &self.info,
561                    Operation::CreateDir,
562                    &[("path", path)],
563                    "finished",
564                    None,
565                );
566            })
567            .inspect_err(|err| {
568                self.logger.log(
569                    &self.info,
570                    Operation::CreateDir,
571                    &[("path", path)],
572                    "failed",
573                    Some(err),
574                );
575            })
576    }
577
578    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
579        self.logger.log(
580            &self.info,
581            Operation::Read,
582            &[("path", path)],
583            "started",
584            None,
585        );
586
587        self.inner
588            .blocking_read(path, args.clone())
589            .map(|(rp, r)| {
590                self.logger.log(
591                    &self.info,
592                    Operation::Read,
593                    &[("path", path)],
594                    "created reader",
595                    None,
596                );
597                let r = LoggingReader::new(self.info.clone(), self.logger.clone(), path, r);
598                (rp, r)
599            })
600            .inspect_err(|err| {
601                self.logger.log(
602                    &self.info,
603                    Operation::Read,
604                    &[("path", path)],
605                    "failed",
606                    Some(err),
607                );
608            })
609    }
610
611    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
612        self.logger.log(
613            &self.info,
614            Operation::Write,
615            &[("path", path)],
616            "started",
617            None,
618        );
619
620        self.inner
621            .blocking_write(path, args)
622            .map(|(rp, w)| {
623                self.logger.log(
624                    &self.info,
625                    Operation::Write,
626                    &[("path", path)],
627                    "created writer",
628                    None,
629                );
630                let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
631                (rp, w)
632            })
633            .inspect_err(|err| {
634                self.logger.log(
635                    &self.info,
636                    Operation::Write,
637                    &[("path", path)],
638                    "failed",
639                    Some(err),
640                );
641            })
642    }
643
644    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
645        self.logger.log(
646            &self.info,
647            Operation::Copy,
648            &[("from", from), ("to", to)],
649            "started",
650            None,
651        );
652
653        self.inner
654            .blocking_copy(from, to, args)
655            .inspect(|_| {
656                self.logger.log(
657                    &self.info,
658                    Operation::Copy,
659                    &[("from", from), ("to", to)],
660                    "finished",
661                    None,
662                );
663            })
664            .inspect_err(|err| {
665                self.logger.log(
666                    &self.info,
667                    Operation::Copy,
668                    &[("from", from), ("to", to)],
669                    "",
670                    Some(err),
671                );
672            })
673    }
674
675    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
676        self.logger.log(
677            &self.info,
678            Operation::Rename,
679            &[("from", from), ("to", to)],
680            "started",
681            None,
682        );
683
684        self.inner
685            .blocking_rename(from, to, args)
686            .inspect(|_| {
687                self.logger.log(
688                    &self.info,
689                    Operation::Rename,
690                    &[("from", from), ("to", to)],
691                    "finished",
692                    None,
693                );
694            })
695            .inspect_err(|err| {
696                self.logger.log(
697                    &self.info,
698                    Operation::Rename,
699                    &[("from", from), ("to", to)],
700                    "failed",
701                    Some(err),
702                );
703            })
704    }
705
706    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
707        self.logger.log(
708            &self.info,
709            Operation::Stat,
710            &[("path", path)],
711            "started",
712            None,
713        );
714
715        self.inner
716            .blocking_stat(path, args)
717            .inspect(|_| {
718                self.logger.log(
719                    &self.info,
720                    Operation::Stat,
721                    &[("path", path)],
722                    "finished",
723                    None,
724                );
725            })
726            .inspect_err(|err| {
727                self.logger.log(
728                    &self.info,
729                    Operation::Stat,
730                    &[("path", path)],
731                    "failed",
732                    Some(err),
733                );
734            })
735    }
736
737    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
738        self.logger
739            .log(&self.info, Operation::Delete, &[], "started", None);
740
741        self.inner
742            .blocking_delete()
743            .map(|(rp, d)| {
744                self.logger
745                    .log(&self.info, Operation::Delete, &[], "finished", None);
746                let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
747                (rp, d)
748            })
749            .inspect_err(|err| {
750                self.logger
751                    .log(&self.info, Operation::Delete, &[], "failed", Some(err));
752            })
753    }
754
755    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
756        self.logger.log(
757            &self.info,
758            Operation::List,
759            &[("path", path)],
760            "started",
761            None,
762        );
763
764        self.inner
765            .blocking_list(path, args)
766            .map(|(rp, v)| {
767                self.logger.log(
768                    &self.info,
769                    Operation::List,
770                    &[("path", path)],
771                    "created lister",
772                    None,
773                );
774                let li = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
775                (rp, li)
776            })
777            .inspect_err(|err| {
778                self.logger.log(
779                    &self.info,
780                    Operation::List,
781                    &[("path", path)],
782                    "",
783                    Some(err),
784                );
785            })
786    }
787}
788
789/// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality.
790pub struct LoggingReader<R, I: LoggingInterceptor> {
791    info: Arc<AccessorInfo>,
792    logger: I,
793    path: String,
794
795    read: u64,
796    inner: R,
797}
798
799impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
800    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
801        Self {
802            info,
803            logger,
804            path: path.to_string(),
805
806            read: 0,
807            inner: reader,
808        }
809    }
810}
811
812impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
813    async fn read(&mut self) -> Result<Buffer> {
814        self.logger.log(
815            &self.info,
816            Operation::Read,
817            &[("path", &self.path), ("read", &self.read.to_string())],
818            "started",
819            None,
820        );
821
822        match self.inner.read().await {
823            Ok(bs) => {
824                self.read += bs.len() as u64;
825                self.logger.log(
826                    &self.info,
827                    Operation::Read,
828                    &[
829                        ("path", &self.path),
830                        ("read", &self.read.to_string()),
831                        ("size", &bs.len().to_string()),
832                    ],
833                    if bs.is_empty() {
834                        "finished"
835                    } else {
836                        "succeeded"
837                    },
838                    None,
839                );
840                Ok(bs)
841            }
842            Err(err) => {
843                self.logger.log(
844                    &self.info,
845                    Operation::Read,
846                    &[("path", &self.path), ("read", &self.read.to_string())],
847                    "failed",
848                    Some(&err),
849                );
850                Err(err)
851            }
852        }
853    }
854}
855
856impl<R: oio::BlockingRead, I: LoggingInterceptor> oio::BlockingRead for LoggingReader<R, I> {
857    fn read(&mut self) -> Result<Buffer> {
858        self.logger.log(
859            &self.info,
860            Operation::Read,
861            &[("path", &self.path), ("read", &self.read.to_string())],
862            "started",
863            None,
864        );
865
866        match self.inner.read() {
867            Ok(bs) => {
868                self.read += bs.len() as u64;
869                self.logger.log(
870                    &self.info,
871                    Operation::Read,
872                    &[
873                        ("path", &self.path),
874                        ("read", &self.read.to_string()),
875                        ("size", &bs.len().to_string()),
876                    ],
877                    if bs.is_empty() {
878                        "finished"
879                    } else {
880                        "succeeded"
881                    },
882                    None,
883                );
884                Ok(bs)
885            }
886            Err(err) => {
887                self.logger.log(
888                    &self.info,
889                    Operation::Read,
890                    &[("path", &self.path), ("read", &self.read.to_string())],
891                    "failed",
892                    Some(&err),
893                );
894                Err(err)
895            }
896        }
897    }
898}
899
900pub struct LoggingWriter<W, I> {
901    info: Arc<AccessorInfo>,
902    logger: I,
903    path: String,
904
905    written: u64,
906    inner: W,
907}
908
909impl<W, I> LoggingWriter<W, I> {
910    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
911        Self {
912            info,
913            logger,
914            path: path.to_string(),
915
916            written: 0,
917            inner: writer,
918        }
919    }
920}
921
922impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
923    async fn write(&mut self, bs: Buffer) -> Result<()> {
924        let size = bs.len();
925
926        self.logger.log(
927            &self.info,
928            Operation::Write,
929            &[
930                ("path", &self.path),
931                ("written", &self.written.to_string()),
932                ("size", &size.to_string()),
933            ],
934            "started",
935            None,
936        );
937
938        match self.inner.write(bs).await {
939            Ok(_) => {
940                self.written += size as u64;
941                self.logger.log(
942                    &self.info,
943                    Operation::Write,
944                    &[
945                        ("path", &self.path),
946                        ("written", &self.written.to_string()),
947                        ("size", &size.to_string()),
948                    ],
949                    "succeeded",
950                    None,
951                );
952                Ok(())
953            }
954            Err(err) => {
955                self.logger.log(
956                    &self.info,
957                    Operation::Write,
958                    &[
959                        ("path", &self.path),
960                        ("written", &self.written.to_string()),
961                        ("size", &size.to_string()),
962                    ],
963                    "failed",
964                    Some(&err),
965                );
966                Err(err)
967            }
968        }
969    }
970
971    async fn abort(&mut self) -> Result<()> {
972        self.logger.log(
973            &self.info,
974            Operation::Write,
975            &[("path", &self.path), ("written", &self.written.to_string())],
976            "started",
977            None,
978        );
979
980        match self.inner.abort().await {
981            Ok(_) => {
982                self.logger.log(
983                    &self.info,
984                    Operation::Write,
985                    &[("path", &self.path), ("written", &self.written.to_string())],
986                    "succeeded",
987                    None,
988                );
989                Ok(())
990            }
991            Err(err) => {
992                self.logger.log(
993                    &self.info,
994                    Operation::Write,
995                    &[("path", &self.path), ("written", &self.written.to_string())],
996                    "failed",
997                    Some(&err),
998                );
999                Err(err)
1000            }
1001        }
1002    }
1003
1004    async fn close(&mut self) -> Result<Metadata> {
1005        self.logger.log(
1006            &self.info,
1007            Operation::Write,
1008            &[("path", &self.path), ("written", &self.written.to_string())],
1009            "started",
1010            None,
1011        );
1012
1013        match self.inner.close().await {
1014            Ok(meta) => {
1015                self.logger.log(
1016                    &self.info,
1017                    Operation::Write,
1018                    &[("path", &self.path), ("written", &self.written.to_string())],
1019                    "succeeded",
1020                    None,
1021                );
1022                Ok(meta)
1023            }
1024            Err(err) => {
1025                self.logger.log(
1026                    &self.info,
1027                    Operation::Write,
1028                    &[("path", &self.path), ("written", &self.written.to_string())],
1029                    "failed",
1030                    Some(&err),
1031                );
1032                Err(err)
1033            }
1034        }
1035    }
1036}
1037
1038impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for LoggingWriter<W, I> {
1039    fn write(&mut self, bs: Buffer) -> Result<()> {
1040        let size = bs.len();
1041
1042        self.logger.log(
1043            &self.info,
1044            Operation::Write,
1045            &[
1046                ("path", &self.path),
1047                ("written", &self.written.to_string()),
1048                ("size", &size.to_string()),
1049            ],
1050            "started",
1051            None,
1052        );
1053
1054        match self.inner.write(bs) {
1055            Ok(_) => {
1056                self.logger.log(
1057                    &self.info,
1058                    Operation::Write,
1059                    &[
1060                        ("path", &self.path),
1061                        ("written", &self.written.to_string()),
1062                        ("size", &size.to_string()),
1063                    ],
1064                    "succeeded",
1065                    None,
1066                );
1067                Ok(())
1068            }
1069            Err(err) => {
1070                self.logger.log(
1071                    &self.info,
1072                    Operation::Write,
1073                    &[
1074                        ("path", &self.path),
1075                        ("written", &self.written.to_string()),
1076                        ("size", &size.to_string()),
1077                    ],
1078                    "failed",
1079                    Some(&err),
1080                );
1081                Err(err)
1082            }
1083        }
1084    }
1085
1086    fn close(&mut self) -> Result<Metadata> {
1087        self.logger.log(
1088            &self.info,
1089            Operation::Write,
1090            &[("path", &self.path), ("written", &self.written.to_string())],
1091            "started",
1092            None,
1093        );
1094
1095        match self.inner.close() {
1096            Ok(meta) => {
1097                self.logger.log(
1098                    &self.info,
1099                    Operation::Write,
1100                    &[("path", &self.path), ("written", &self.written.to_string())],
1101                    "succeeded",
1102                    None,
1103                );
1104                Ok(meta)
1105            }
1106            Err(err) => {
1107                self.logger.log(
1108                    &self.info,
1109                    Operation::Write,
1110                    &[("path", &self.path), ("written", &self.written.to_string())],
1111                    "failed",
1112                    Some(&err),
1113                );
1114                Err(err)
1115            }
1116        }
1117    }
1118}
1119
1120pub struct LoggingLister<P, I: LoggingInterceptor> {
1121    info: Arc<AccessorInfo>,
1122    logger: I,
1123    path: String,
1124
1125    listed: usize,
1126    inner: P,
1127}
1128
1129impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
1130    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
1131        Self {
1132            info,
1133            logger,
1134            path: path.to_string(),
1135
1136            listed: 0,
1137            inner,
1138        }
1139    }
1140}
1141
1142impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
1143    async fn next(&mut self) -> Result<Option<oio::Entry>> {
1144        self.logger.log(
1145            &self.info,
1146            Operation::List,
1147            &[("path", &self.path), ("listed", &self.listed.to_string())],
1148            "started",
1149            None,
1150        );
1151
1152        let res = self.inner.next().await;
1153
1154        match &res {
1155            Ok(Some(de)) => {
1156                self.listed += 1;
1157                self.logger.log(
1158                    &self.info,
1159                    Operation::List,
1160                    &[
1161                        ("path", &self.path),
1162                        ("listed", &self.listed.to_string()),
1163                        ("entry", de.path()),
1164                    ],
1165                    "succeeded",
1166                    None,
1167                );
1168            }
1169            Ok(None) => {
1170                self.logger.log(
1171                    &self.info,
1172                    Operation::List,
1173                    &[("path", &self.path), ("listed", &self.listed.to_string())],
1174                    "finished",
1175                    None,
1176                );
1177            }
1178            Err(err) => {
1179                self.logger.log(
1180                    &self.info,
1181                    Operation::List,
1182                    &[("path", &self.path), ("listed", &self.listed.to_string())],
1183                    "failed",
1184                    Some(err),
1185                );
1186            }
1187        };
1188
1189        res
1190    }
1191}
1192
1193impl<P: oio::BlockingList, I: LoggingInterceptor> oio::BlockingList for LoggingLister<P, I> {
1194    fn next(&mut self) -> Result<Option<oio::Entry>> {
1195        self.logger.log(
1196            &self.info,
1197            Operation::List,
1198            &[("path", &self.path), ("listed", &self.listed.to_string())],
1199            "started",
1200            None,
1201        );
1202
1203        let res = self.inner.next();
1204        match &res {
1205            Ok(Some(de)) => {
1206                self.listed += 1;
1207                self.logger.log(
1208                    &self.info,
1209                    Operation::List,
1210                    &[
1211                        ("path", &self.path),
1212                        ("listed", &self.listed.to_string()),
1213                        ("entry", de.path()),
1214                    ],
1215                    "succeeded",
1216                    None,
1217                );
1218            }
1219            Ok(None) => {
1220                self.logger.log(
1221                    &self.info,
1222                    Operation::List,
1223                    &[("path", &self.path), ("listed", &self.listed.to_string())],
1224                    "finished",
1225                    None,
1226                );
1227            }
1228            Err(err) => {
1229                self.logger.log(
1230                    &self.info,
1231                    Operation::List,
1232                    &[("path", &self.path), ("listed", &self.listed.to_string())],
1233                    "failed",
1234                    Some(err),
1235                );
1236            }
1237        };
1238
1239        res
1240    }
1241}
1242
1243pub struct LoggingDeleter<D, I: LoggingInterceptor> {
1244    info: Arc<AccessorInfo>,
1245    logger: I,
1246
1247    queued: usize,
1248    deleted: usize,
1249    inner: D,
1250}
1251
1252impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
1253    fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
1254        Self {
1255            info,
1256            logger,
1257
1258            queued: 0,
1259            deleted: 0,
1260            inner,
1261        }
1262    }
1263}
1264
1265impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
1266    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1267        let version = args
1268            .version()
1269            .map(|v| v.to_string())
1270            .unwrap_or_else(|| "<latest>".to_string());
1271
1272        self.logger.log(
1273            &self.info,
1274            Operation::Delete,
1275            &[("path", path), ("version", &version)],
1276            "started",
1277            None,
1278        );
1279
1280        let res = self.inner.delete(path, args);
1281
1282        match &res {
1283            Ok(_) => {
1284                self.queued += 1;
1285                self.logger.log(
1286                    &self.info,
1287                    Operation::Delete,
1288                    &[
1289                        ("path", path),
1290                        ("version", &version),
1291                        ("queued", &self.queued.to_string()),
1292                        ("deleted", &self.deleted.to_string()),
1293                    ],
1294                    "succeeded",
1295                    None,
1296                );
1297            }
1298            Err(err) => {
1299                self.logger.log(
1300                    &self.info,
1301                    Operation::Delete,
1302                    &[
1303                        ("path", path),
1304                        ("version", &version),
1305                        ("queued", &self.queued.to_string()),
1306                        ("deleted", &self.deleted.to_string()),
1307                    ],
1308                    "failed",
1309                    Some(err),
1310                );
1311            }
1312        };
1313
1314        res
1315    }
1316
1317    async fn flush(&mut self) -> Result<usize> {
1318        self.logger.log(
1319            &self.info,
1320            Operation::Delete,
1321            &[
1322                ("queued", &self.queued.to_string()),
1323                ("deleted", &self.deleted.to_string()),
1324            ],
1325            "started",
1326            None,
1327        );
1328
1329        let res = self.inner.flush().await;
1330
1331        match &res {
1332            Ok(flushed) => {
1333                self.queued -= flushed;
1334                self.deleted += flushed;
1335                self.logger.log(
1336                    &self.info,
1337                    Operation::Delete,
1338                    &[
1339                        ("queued", &self.queued.to_string()),
1340                        ("deleted", &self.deleted.to_string()),
1341                    ],
1342                    "succeeded",
1343                    None,
1344                );
1345            }
1346            Err(err) => {
1347                self.logger.log(
1348                    &self.info,
1349                    Operation::Delete,
1350                    &[
1351                        ("queued", &self.queued.to_string()),
1352                        ("deleted", &self.deleted.to_string()),
1353                    ],
1354                    "failed",
1355                    Some(err),
1356                );
1357            }
1358        };
1359
1360        res
1361    }
1362}
1363
1364impl<D: oio::BlockingDelete, I: LoggingInterceptor> oio::BlockingDelete for LoggingDeleter<D, I> {
1365    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
1366        let version = args
1367            .version()
1368            .map(|v| v.to_string())
1369            .unwrap_or_else(|| "<latest>".to_string());
1370
1371        self.logger.log(
1372            &self.info,
1373            Operation::Delete,
1374            &[("path", path), ("version", &version)],
1375            "started",
1376            None,
1377        );
1378
1379        let res = self.inner.delete(path, args);
1380
1381        match &res {
1382            Ok(_) => {
1383                self.queued += 1;
1384                self.logger.log(
1385                    &self.info,
1386                    Operation::Delete,
1387                    &[
1388                        ("path", path),
1389                        ("version", &version),
1390                        ("queued", &self.queued.to_string()),
1391                        ("deleted", &self.deleted.to_string()),
1392                    ],
1393                    "succeeded",
1394                    None,
1395                );
1396            }
1397            Err(err) => {
1398                self.logger.log(
1399                    &self.info,
1400                    Operation::Delete,
1401                    &[
1402                        ("path", path),
1403                        ("version", &version),
1404                        ("queued", &self.queued.to_string()),
1405                        ("deleted", &self.deleted.to_string()),
1406                    ],
1407                    "failed",
1408                    Some(err),
1409                );
1410            }
1411        };
1412
1413        res
1414    }
1415
1416    fn flush(&mut self) -> Result<usize> {
1417        self.logger.log(
1418            &self.info,
1419            Operation::Delete,
1420            &[
1421                ("queued", &self.queued.to_string()),
1422                ("deleted", &self.deleted.to_string()),
1423            ],
1424            "started",
1425            None,
1426        );
1427
1428        let res = self.inner.flush();
1429
1430        match &res {
1431            Ok(flushed) => {
1432                self.queued -= flushed;
1433                self.deleted += flushed;
1434                self.logger.log(
1435                    &self.info,
1436                    Operation::Delete,
1437                    &[
1438                        ("queued", &self.queued.to_string()),
1439                        ("deleted", &self.deleted.to_string()),
1440                    ],
1441                    "succeeded",
1442                    None,
1443                );
1444            }
1445            Err(err) => {
1446                self.logger.log(
1447                    &self.info,
1448                    Operation::Delete,
1449                    &[
1450                        ("queued", &self.queued.to_string()),
1451                        ("deleted", &self.deleted.to_string()),
1452                    ],
1453                    "failed",
1454                    Some(err),
1455                );
1456            }
1457        };
1458
1459        res
1460    }
1461}