opendal/layers/
tracing.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21
22use futures::Stream;
23use futures::StreamExt;
24use tracing::{span, Level, Span};
25
26use crate::raw::*;
27use crate::*;
28
29/// Add [tracing](https://docs.rs/tracing/) for every operation.
30///
31/// # Examples
32///
33/// ## Basic Setup
34///
35/// ```no_run
36/// # use opendal::layers::TracingLayer;
37/// # use opendal::services;
38/// # use opendal::Operator;
39/// # use opendal::Result;
40///
41/// # fn main() -> Result<()> {
42/// let _ = Operator::new(services::Memory::default())?
43///     .layer(TracingLayer)
44///     .finish();
45/// Ok(())
46/// # }
47/// ```
48///
49/// ## Real usage
50///
51/// ```no_run
52/// # use anyhow::Result;
53/// # use opendal::layers::TracingLayer;
54/// # use opendal::services;
55/// # use opendal::Operator;
56/// # use opentelemetry::KeyValue;
57/// # use opentelemetry_sdk::trace;
58/// # use opentelemetry_sdk::Resource;
59/// # use tracing_subscriber::prelude::*;
60/// # use tracing_subscriber::EnvFilter;
61///
62/// # fn main() -> Result<()> {
63/// use opentelemetry::trace::TracerProvider;
64/// let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
65///     .with_simple_exporter(opentelemetry_otlp::SpanExporter::builder().with_tonic().build()?)
66///     .with_resource(Resource::builder().with_attributes(vec![
67///         KeyValue::new("service.name", "opendal_example"),
68///     ]).build())
69///     .build();
70/// let tracer = tracer_provider.tracer("opendal_tracer");
71/// let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
72///
73/// tracing_subscriber::registry()
74///     .with(EnvFilter::from_default_env())
75///     .with(opentelemetry)
76///     .try_init()?;
77///
78/// {
79///     let runtime = tokio::runtime::Runtime::new()?;
80///     runtime.block_on(async {
81///         let root = tracing::span!(tracing::Level::INFO, "app_start", work_units = 2);
82///         let _enter = root.enter();
83///
84///         let _ = dotenvy::dotenv();
85///         let op = Operator::new(services::Memory::default())?
86///             .layer(TracingLayer)
87///             .finish();
88///
89///         op.write("test", "0".repeat(16 * 1024 * 1024).into_bytes())
90///             .await?;
91///         op.stat("test").await?;
92///         op.read("test").await?;
93///         Ok::<(), opendal::Error>(())
94///     })?;
95/// }
96///
97/// // Shut down the current tracer provider.
98/// // This will invoke the shutdown method on all span processors.
99/// // span processors should export remaining spans before return.
100/// tracer_provider.shutdown()?;
101///
102/// Ok(())
103/// # }
104/// ```
105///
106/// # Output
107///
108/// OpenDAL is using [`tracing`](https://docs.rs/tracing/latest/tracing/) for tracing internally.
109///
110/// To enable tracing output, please init one of the subscribers that `tracing` supports.
111///
112/// For example:
113///
114/// ```no_run
115/// # use tracing::dispatcher;
116/// # use tracing::Event;
117/// # use tracing::Metadata;
118/// # use tracing::span::Attributes;
119/// # use tracing::span::Id;
120/// # use tracing::span::Record;
121/// # use tracing::subscriber::Subscriber;
122///
123/// # pub struct FooSubscriber;
124/// # impl Subscriber for FooSubscriber {
125/// #   fn enabled(&self, _: &Metadata) -> bool { false }
126/// #   fn new_span(&self, _: &Attributes) -> Id { Id::from_u64(0) }
127/// #   fn record(&self, _: &Id, _: &Record) {}
128/// #   fn record_follows_from(&self, _: &Id, _: &Id) {}
129/// #   fn event(&self, _: &Event) {}
130/// #   fn enter(&self, _: &Id) {}
131/// #   fn exit(&self, _: &Id) {}
132/// # }
133/// # impl FooSubscriber { fn new() -> Self { FooSubscriber } }
134///
135/// let my_subscriber = FooSubscriber::new();
136/// tracing::subscriber::set_global_default(my_subscriber).expect("setting tracing default failed");
137/// ```
138///
139/// For real-world usage, please take a look at [`tracing-opentelemetry`](https://crates.io/crates/tracing-opentelemetry).
140pub struct TracingLayer;
141
142impl<A: Access> Layer<A> for TracingLayer {
143    type LayeredAccess = TracingAccessor<A>;
144
145    fn layer(&self, inner: A) -> Self::LayeredAccess {
146        let info = inner.info();
147
148        // Update http client with metrics http fetcher.
149        info.update_http_client(|client| {
150            HttpClient::with(TracingHttpFetcher {
151                inner: client.into_inner(),
152            })
153        });
154
155        TracingAccessor { inner }
156    }
157}
158
159pub struct TracingHttpFetcher {
160    inner: HttpFetcher,
161}
162
163impl HttpFetch for TracingHttpFetcher {
164    async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
165        let span = span!(Level::DEBUG, "http::fetch", ?req);
166
167        let resp = {
168            let _enter = span.enter();
169            self.inner.fetch(req).await?
170        };
171
172        let (parts, body) = resp.into_parts();
173        let body = body.map_inner(|s| Box::new(TracingStream { inner: s, span }));
174        Ok(http::Response::from_parts(parts, body))
175    }
176}
177
178pub struct TracingStream<S> {
179    inner: S,
180    span: Span,
181}
182
183impl<S> Stream for TracingStream<S>
184where
185    S: Stream<Item = Result<Buffer>> + Unpin + 'static,
186{
187    type Item = Result<Buffer>;
188
189    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190        let _enter = self.span.clone().entered();
191        self.inner.poll_next_unpin(cx)
192    }
193}
194
195#[derive(Debug)]
196pub struct TracingAccessor<A> {
197    inner: A,
198}
199
200impl<A: Access> LayeredAccess for TracingAccessor<A> {
201    type Inner = A;
202    type Reader = TracingWrapper<A::Reader>;
203    type Writer = TracingWrapper<A::Writer>;
204    type Lister = TracingWrapper<A::Lister>;
205    type Deleter = TracingWrapper<A::Deleter>;
206    type BlockingReader = TracingWrapper<A::BlockingReader>;
207    type BlockingWriter = TracingWrapper<A::BlockingWriter>;
208    type BlockingLister = TracingWrapper<A::BlockingLister>;
209    type BlockingDeleter = TracingWrapper<A::BlockingDeleter>;
210
211    fn inner(&self) -> &Self::Inner {
212        &self.inner
213    }
214
215    #[tracing::instrument(level = "debug", skip(self))]
216    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
217        self.inner.create_dir(path, args).await
218    }
219
220    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
221        let span = span!(Level::DEBUG, "read", path, ?args);
222
223        let (rp, r) = {
224            let _enter = span.enter();
225            self.inner.read(path, args).await?
226        };
227
228        Ok((rp, TracingWrapper::new(span, r)))
229    }
230
231    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
232        let span = span!(Level::DEBUG, "write", path, ?args);
233
234        let (rp, r) = {
235            let _enter = span.enter();
236            self.inner.write(path, args).await?
237        };
238
239        Ok((rp, TracingWrapper::new(span, r)))
240    }
241
242    #[tracing::instrument(level = "debug", skip(self))]
243    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
244        self.inner().copy(from, to, args).await
245    }
246
247    #[tracing::instrument(level = "debug", skip(self))]
248    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
249        self.inner().rename(from, to, args).await
250    }
251
252    #[tracing::instrument(level = "debug", skip(self))]
253    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
254        self.inner.stat(path, args).await
255    }
256
257    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
258        let span = span!(Level::DEBUG, "delete");
259
260        let (rp, r) = {
261            let _enter = span.enter();
262            self.inner.delete().await?
263        };
264
265        Ok((rp, TracingWrapper::new(span, r)))
266    }
267
268    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
269        let span = span!(Level::DEBUG, "list", path, ?args);
270
271        let (rp, r) = {
272            let _enter = span.enter();
273            self.inner.list(path, args).await?
274        };
275
276        Ok((rp, TracingWrapper::new(span, r)))
277    }
278
279    #[tracing::instrument(level = "debug", skip(self))]
280    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
281        self.inner.presign(path, args).await
282    }
283
284    #[tracing::instrument(level = "debug", skip(self))]
285    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
286        self.inner.blocking_create_dir(path, args)
287    }
288
289    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
290        let span = span!(Level::DEBUG, "read", path, ?args);
291
292        let (rp, r) = {
293            let _enter = span.enter();
294            self.inner.blocking_read(path, args)?
295        };
296
297        Ok((rp, TracingWrapper::new(span, r)))
298    }
299
300    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
301        let span = span!(Level::DEBUG, "write", path, ?args);
302
303        let (rp, r) = {
304            let _enter = span.enter();
305            self.inner.blocking_write(path, args)?
306        };
307
308        Ok((rp, TracingWrapper::new(span, r)))
309    }
310
311    #[tracing::instrument(level = "debug", skip(self))]
312    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
313        self.inner().blocking_copy(from, to, args)
314    }
315
316    #[tracing::instrument(level = "debug", skip(self))]
317    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
318        self.inner().blocking_rename(from, to, args)
319    }
320
321    #[tracing::instrument(level = "debug", skip(self))]
322    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
323        self.inner.blocking_stat(path, args)
324    }
325
326    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
327        let span = span!(Level::DEBUG, "delete");
328
329        let (rp, r) = {
330            let _enter = span.enter();
331            self.inner.blocking_delete()?
332        };
333
334        Ok((rp, TracingWrapper::new(span, r)))
335    }
336
337    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
338        let span = span!(Level::DEBUG, "list", path, ?args);
339
340        let (rp, r) = {
341            let _enter = span.enter();
342            self.inner.blocking_list(path, args)?
343        };
344
345        Ok((rp, TracingWrapper::new(span, r)))
346    }
347}
348
349pub struct TracingWrapper<R> {
350    span: Span,
351    inner: R,
352}
353
354impl<R> TracingWrapper<R> {
355    fn new(span: Span, inner: R) -> Self {
356        Self { span, inner }
357    }
358}
359
360impl<R: oio::Read> oio::Read for TracingWrapper<R> {
361    async fn read(&mut self) -> Result<Buffer> {
362        let _enter = self.span.enter();
363
364        self.inner.read().await
365    }
366}
367
368impl<R: oio::BlockingRead> oio::BlockingRead for TracingWrapper<R> {
369    fn read(&mut self) -> Result<Buffer> {
370        let _enter = self.span.enter();
371
372        self.inner.read()
373    }
374}
375
376impl<R: oio::Write> oio::Write for TracingWrapper<R> {
377    async fn write(&mut self, bs: Buffer) -> Result<()> {
378        let _enter = self.span.enter();
379
380        self.inner.write(bs).await
381    }
382
383    async fn abort(&mut self) -> Result<()> {
384        let _enter = self.span.enter();
385
386        self.inner.abort().await
387    }
388
389    async fn close(&mut self) -> Result<Metadata> {
390        let _enter = self.span.enter();
391
392        self.inner.close().await
393    }
394}
395
396impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> {
397    fn write(&mut self, bs: Buffer) -> Result<()> {
398        let _enter = self.span.enter();
399
400        self.inner.write(bs)
401    }
402
403    fn close(&mut self) -> Result<Metadata> {
404        let _enter = self.span.enter();
405
406        self.inner.close()
407    }
408}
409
410impl<R: oio::List> oio::List for TracingWrapper<R> {
411    async fn next(&mut self) -> Result<Option<oio::Entry>> {
412        let _enter = self.span.enter();
413
414        self.inner.next().await
415    }
416}
417
418impl<R: oio::BlockingList> oio::BlockingList for TracingWrapper<R> {
419    fn next(&mut self) -> Result<Option<oio::Entry>> {
420        let _enter = self.span.enter();
421
422        self.inner.next()
423    }
424}
425
426impl<R: oio::Delete> oio::Delete for TracingWrapper<R> {
427    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
428        let _enter = self.span.enter();
429
430        self.inner.delete(path, args)
431    }
432
433    async fn flush(&mut self) -> Result<usize> {
434        let _enter = self.span.enter();
435
436        self.inner.flush().await
437    }
438}
439
440impl<R: oio::BlockingDelete> oio::BlockingDelete for TracingWrapper<R> {
441    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
442        let _enter = self.span.enter();
443
444        self.inner.delete(path, args)
445    }
446
447    fn flush(&mut self) -> Result<usize> {
448        let _enter = self.span.enter();
449
450        self.inner.flush()
451    }
452}