opendal/layers/
tracing.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::future::Future;
20use std::sync::Arc;
21
22use tracing::Span;
23
24use crate::raw::*;
25use crate::*;
26
27/// Add [tracing](https://docs.rs/tracing/) for every operation.
28///
29/// # Examples
30///
31/// ## Basic Setup
32///
33/// ```no_run
34/// # use opendal::layers::TracingLayer;
35/// # use opendal::services;
36/// # use opendal::Operator;
37/// # use opendal::Result;
38///
39/// # fn main() -> Result<()> {
40/// let _ = Operator::new(services::Memory::default())?
41///     .layer(TracingLayer)
42///     .finish();
43/// Ok(())
44/// # }
45/// ```
46///
47/// ## Real usage
48///
49/// ```no_run
50/// # use anyhow::Result;
51/// # use opendal::layers::TracingLayer;
52/// # use opendal::services;
53/// # use opendal::Operator;
54/// # use opentelemetry::KeyValue;
55/// # use opentelemetry_sdk::trace;
56/// # use opentelemetry_sdk::Resource;
57/// # use tracing_subscriber::prelude::*;
58/// # use tracing_subscriber::EnvFilter;
59///
60/// # fn main() -> Result<()> {
61/// use opentelemetry::trace::TracerProvider;
62/// let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
63///     .with_simple_exporter(opentelemetry_otlp::SpanExporter::builder().with_tonic().build()?)
64///     .with_resource(Resource::builder().with_attributes(vec![
65///         KeyValue::new("service.name", "opendal_example"),
66///     ]).build())
67///     .build();
68/// let tracer = tracer_provider.tracer("opendal_tracer");
69/// let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
70///
71/// tracing_subscriber::registry()
72///     .with(EnvFilter::from_default_env())
73///     .with(opentelemetry)
74///     .try_init()?;
75///
76/// {
77///     let runtime = tokio::runtime::Runtime::new()?;
78///     runtime.block_on(async {
79///         let root = tracing::span!(tracing::Level::INFO, "app_start", work_units = 2);
80///         let _enter = root.enter();
81///
82///         let _ = dotenvy::dotenv();
83///         let op = Operator::new(services::Memory::default())?
84///             .layer(TracingLayer)
85///             .finish();
86///
87///         op.write("test", "0".repeat(16 * 1024 * 1024).into_bytes())
88///             .await?;
89///         op.stat("test").await?;
90///         op.read("test").await?;
91///         Ok::<(), opendal::Error>(())
92///     })?;
93/// }
94///
95/// // Shut down the current tracer provider.
96/// // This will invoke the shutdown method on all span processors.
97/// // span processors should export remaining spans before return.
98/// tracer_provider.shutdown()?;
99///
100/// Ok(())
101/// # }
102/// ```
103///
104/// # Output
105///
106/// OpenDAL is using [`tracing`](https://docs.rs/tracing/latest/tracing/) for tracing internally.
107///
108/// To enable tracing output, please init one of the subscribers that `tracing` supports.
109///
110/// For example:
111///
112/// ```no_run
113/// # use tracing::dispatcher;
114/// # use tracing::Event;
115/// # use tracing::Metadata;
116/// # use tracing::span::Attributes;
117/// # use tracing::span::Id;
118/// # use tracing::span::Record;
119/// # use tracing::subscriber::Subscriber;
120///
121/// # pub struct FooSubscriber;
122/// # impl Subscriber for FooSubscriber {
123/// #   fn enabled(&self, _: &Metadata) -> bool { false }
124/// #   fn new_span(&self, _: &Attributes) -> Id { Id::from_u64(0) }
125/// #   fn record(&self, _: &Id, _: &Record) {}
126/// #   fn record_follows_from(&self, _: &Id, _: &Id) {}
127/// #   fn event(&self, _: &Event) {}
128/// #   fn enter(&self, _: &Id) {}
129/// #   fn exit(&self, _: &Id) {}
130/// # }
131/// # impl FooSubscriber { fn new() -> Self { FooSubscriber } }
132///
133/// let my_subscriber = FooSubscriber::new();
134/// tracing::subscriber::set_global_default(my_subscriber).expect("setting tracing default failed");
135/// ```
136///
137/// For real-world usage, please take a look at [`tracing-opentelemetry`](https://crates.io/crates/tracing-opentelemetry).
138pub struct TracingLayer;
139
140impl<A: Access> Layer<A> for TracingLayer {
141    type LayeredAccess = TracingAccessor<A>;
142
143    fn layer(&self, inner: A) -> Self::LayeredAccess {
144        TracingAccessor { inner }
145    }
146}
147
148#[derive(Debug)]
149pub struct TracingAccessor<A> {
150    inner: A,
151}
152
153impl<A: Access> LayeredAccess for TracingAccessor<A> {
154    type Inner = A;
155    type Reader = TracingWrapper<A::Reader>;
156    type Writer = TracingWrapper<A::Writer>;
157    type Lister = TracingWrapper<A::Lister>;
158    type Deleter = TracingWrapper<A::Deleter>;
159    type BlockingReader = TracingWrapper<A::BlockingReader>;
160    type BlockingWriter = TracingWrapper<A::BlockingWriter>;
161    type BlockingLister = TracingWrapper<A::BlockingLister>;
162    type BlockingDeleter = TracingWrapper<A::BlockingDeleter>;
163
164    fn inner(&self) -> &Self::Inner {
165        &self.inner
166    }
167
168    #[tracing::instrument(level = "debug")]
169    fn info(&self) -> Arc<AccessorInfo> {
170        self.inner.info()
171    }
172
173    #[tracing::instrument(level = "debug", skip(self))]
174    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
175        self.inner.create_dir(path, args).await
176    }
177
178    #[tracing::instrument(level = "debug", skip(self))]
179    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
180        self.inner
181            .read(path, args)
182            .await
183            .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
184    }
185
186    #[tracing::instrument(level = "debug", skip(self))]
187    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
188        self.inner
189            .write(path, args)
190            .await
191            .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
192    }
193
194    #[tracing::instrument(level = "debug", skip(self))]
195    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
196        self.inner().copy(from, to, args).await
197    }
198
199    #[tracing::instrument(level = "debug", skip(self))]
200    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
201        self.inner().rename(from, to, args).await
202    }
203
204    #[tracing::instrument(level = "debug", skip(self))]
205    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
206        self.inner.stat(path, args).await
207    }
208
209    #[tracing::instrument(level = "debug", skip(self))]
210    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
211        self.inner
212            .delete()
213            .await
214            .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
215    }
216
217    #[tracing::instrument(level = "debug", skip(self))]
218    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
219        self.inner
220            .list(path, args)
221            .await
222            .map(|(rp, s)| (rp, TracingWrapper::new(Span::current(), s)))
223    }
224
225    #[tracing::instrument(level = "debug", skip(self))]
226    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
227        self.inner.presign(path, args).await
228    }
229
230    #[tracing::instrument(level = "debug", skip(self))]
231    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
232        self.inner.blocking_create_dir(path, args)
233    }
234
235    #[tracing::instrument(level = "debug", skip(self))]
236    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
237        self.inner
238            .blocking_read(path, args)
239            .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
240    }
241
242    #[tracing::instrument(level = "debug", skip(self))]
243    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
244        self.inner
245            .blocking_write(path, args)
246            .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
247    }
248
249    #[tracing::instrument(level = "debug", skip(self))]
250    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
251        self.inner().blocking_copy(from, to, args)
252    }
253
254    #[tracing::instrument(level = "debug", skip(self))]
255    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
256        self.inner().blocking_rename(from, to, args)
257    }
258
259    #[tracing::instrument(level = "debug", skip(self))]
260    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
261        self.inner.blocking_stat(path, args)
262    }
263
264    #[tracing::instrument(level = "debug", skip(self))]
265    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
266        self.inner
267            .blocking_delete()
268            .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
269    }
270
271    #[tracing::instrument(level = "debug", skip(self))]
272    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
273        self.inner
274            .blocking_list(path, args)
275            .map(|(rp, it)| (rp, TracingWrapper::new(Span::current(), it)))
276    }
277}
278
279pub struct TracingWrapper<R> {
280    span: Span,
281    inner: R,
282}
283
284impl<R> TracingWrapper<R> {
285    fn new(span: Span, inner: R) -> Self {
286        Self { span, inner }
287    }
288}
289
290impl<R: oio::Read> oio::Read for TracingWrapper<R> {
291    #[tracing::instrument(
292        parent = &self.span,
293        level = "trace",
294        skip_all)]
295    async fn read(&mut self) -> Result<Buffer> {
296        self.inner.read().await
297    }
298}
299
300impl<R: oio::BlockingRead> oio::BlockingRead for TracingWrapper<R> {
301    #[tracing::instrument(
302        parent = &self.span,
303        level = "trace",
304        skip_all)]
305    fn read(&mut self) -> Result<Buffer> {
306        self.inner.read()
307    }
308}
309
310impl<R: oio::Write> oio::Write for TracingWrapper<R> {
311    #[tracing::instrument(
312        parent = &self.span,
313        level = "trace",
314        skip_all)]
315    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
316        self.inner.write(bs)
317    }
318
319    #[tracing::instrument(
320        parent = &self.span,
321        level = "trace",
322        skip_all)]
323    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
324        self.inner.abort()
325    }
326
327    #[tracing::instrument(
328        parent = &self.span,
329        level = "trace",
330        skip_all)]
331    fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
332        self.inner.close()
333    }
334}
335
336impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> {
337    #[tracing::instrument(
338        parent = &self.span,
339        level = "trace",
340        skip_all)]
341    fn write(&mut self, bs: Buffer) -> Result<()> {
342        self.inner.write(bs)
343    }
344
345    #[tracing::instrument(
346        parent = &self.span,
347        level = "trace",
348        skip_all)]
349    fn close(&mut self) -> Result<Metadata> {
350        self.inner.close()
351    }
352}
353
354impl<R: oio::List> oio::List for TracingWrapper<R> {
355    #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
356    async fn next(&mut self) -> Result<Option<oio::Entry>> {
357        self.inner.next().await
358    }
359}
360
361impl<R: oio::BlockingList> oio::BlockingList for TracingWrapper<R> {
362    #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
363    fn next(&mut self) -> Result<Option<oio::Entry>> {
364        self.inner.next()
365    }
366}
367
368impl<R: oio::Delete> oio::Delete for TracingWrapper<R> {
369    #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
370    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
371        self.inner.delete(path, args)
372    }
373
374    #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
375    async fn flush(&mut self) -> Result<usize> {
376        self.inner.flush().await
377    }
378}
379
380impl<R: oio::BlockingDelete> oio::BlockingDelete for TracingWrapper<R> {
381    #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
382    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
383        self.inner.delete(path, args)
384    }
385
386    #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
387    fn flush(&mut self) -> Result<usize> {
388        self.inner.flush()
389    }
390}