opendal/layers/
fastrace.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::future::Future;
20use std::sync::Arc;
21
22use fastrace::prelude::*;
23
24use crate::raw::*;
25use crate::*;
26
27/// Add [fastrace](https://docs.rs/fastrace/) for every operation.
28///
29/// # Examples
30///
31/// ## Basic Setup
32///
33/// ```no_run
34/// # use opendal::layers::FastraceLayer;
35/// # use opendal::services;
36/// # use opendal::Operator;
37/// # use opendal::Result;
38///
39/// # fn main() -> Result<()> {
40/// let _ = Operator::new(services::Memory::default())?
41///     .layer(FastraceLayer)
42///     .finish();
43/// Ok(())
44/// # }
45/// ```
46///
47/// ## Real usage
48///
49/// ```no_run
50/// # use anyhow::Result;
51/// # use fastrace::prelude::*;
52/// # use opendal::layers::FastraceLayer;
53/// # use opendal::services;
54/// # use opendal::Operator;
55///
56/// # fn main() -> Result<()> {
57/// let reporter =
58///     fastrace_jaeger::JaegerReporter::new("127.0.0.1:6831".parse()?, "opendal").unwrap();
59/// fastrace::set_reporter(reporter, fastrace::collector::Config::default());
60///
61/// {
62///     let root = Span::root("op", SpanContext::random());
63///     let runtime = tokio::runtime::Runtime::new()?;
64///     runtime.block_on(
65///         async {
66///             let _ = dotenvy::dotenv();
67///             let op = Operator::new(services::Memory::default())?
68///                 .layer(FastraceLayer)
69///                 .finish();
70///             op.write("test", "0".repeat(16 * 1024 * 1024).into_bytes())
71///                 .await?;
72///             op.stat("test").await?;
73///             op.read("test").await?;
74///             Ok::<(), opendal::Error>(())
75///         }
76///         .in_span(Span::enter_with_parent("test", &root)),
77///     )?;
78/// }
79///
80/// fastrace::flush();
81///
82/// Ok(())
83/// # }
84/// ```
85///
86/// # Output
87///
88/// OpenDAL is using [`fastrace`](https://docs.rs/fastrace/latest/fastrace/) for tracing internally.
89///
90/// To enable fastrace output, please init one of the reporter that `fastrace` supports.
91///
92/// For example:
93///
94/// ```no_run
95/// # use anyhow::Result;
96///
97/// # fn main() -> Result<()> {
98/// let reporter =
99///     fastrace_jaeger::JaegerReporter::new("127.0.0.1:6831".parse()?, "opendal").unwrap();
100/// fastrace::set_reporter(reporter, fastrace::collector::Config::default());
101/// Ok(())
102/// # }
103/// ```
104///
105/// For real-world usage, please take a look at [`fastrace-datadog`](https://crates.io/crates/fastrace-datadog) or [`fastrace-jaeger`](https://crates.io/crates/fastrace-jaeger) .
106pub struct FastraceLayer;
107
108impl<A: Access> Layer<A> for FastraceLayer {
109    type LayeredAccess = FastraceAccessor<A>;
110
111    fn layer(&self, inner: A) -> Self::LayeredAccess {
112        FastraceAccessor { inner }
113    }
114}
115
116#[derive(Debug)]
117pub struct FastraceAccessor<A> {
118    inner: A,
119}
120
121impl<A: Access> LayeredAccess for FastraceAccessor<A> {
122    type Inner = A;
123    type Reader = FastraceWrapper<A::Reader>;
124    type BlockingReader = FastraceWrapper<A::BlockingReader>;
125    type Writer = FastraceWrapper<A::Writer>;
126    type BlockingWriter = FastraceWrapper<A::BlockingWriter>;
127    type Lister = FastraceWrapper<A::Lister>;
128    type BlockingLister = FastraceWrapper<A::BlockingLister>;
129    type Deleter = FastraceWrapper<A::Deleter>;
130    type BlockingDeleter = FastraceWrapper<A::BlockingDeleter>;
131
132    fn inner(&self) -> &Self::Inner {
133        &self.inner
134    }
135
136    #[trace]
137    fn info(&self) -> Arc<AccessorInfo> {
138        self.inner.info()
139    }
140
141    #[trace(enter_on_poll = true)]
142    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
143        self.inner.create_dir(path, args).await
144    }
145
146    #[trace(enter_on_poll = true)]
147    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
148        self.inner.read(path, args).await.map(|(rp, r)| {
149            (
150                rp,
151                FastraceWrapper::new(
152                    Span::enter_with_local_parent(Operation::Read.into_static()),
153                    r,
154                ),
155            )
156        })
157    }
158
159    #[trace(enter_on_poll = true)]
160    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
161        self.inner.write(path, args).await.map(|(rp, r)| {
162            (
163                rp,
164                FastraceWrapper::new(
165                    Span::enter_with_local_parent(Operation::Write.into_static()),
166                    r,
167                ),
168            )
169        })
170    }
171
172    #[trace(enter_on_poll = true)]
173    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
174        self.inner().copy(from, to, args).await
175    }
176
177    #[trace(enter_on_poll = true)]
178    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
179        self.inner().rename(from, to, args).await
180    }
181
182    #[trace(enter_on_poll = true)]
183    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
184        self.inner.stat(path, args).await
185    }
186
187    #[trace(enter_on_poll = true)]
188    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
189        self.inner.delete().await.map(|(rp, r)| {
190            (
191                rp,
192                FastraceWrapper::new(
193                    Span::enter_with_local_parent(Operation::Delete.into_static()),
194                    r,
195                ),
196            )
197        })
198    }
199
200    #[trace(enter_on_poll = true)]
201    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
202        self.inner.list(path, args).await.map(|(rp, s)| {
203            (
204                rp,
205                FastraceWrapper::new(
206                    Span::enter_with_local_parent(Operation::List.into_static()),
207                    s,
208                ),
209            )
210        })
211    }
212
213    #[trace(enter_on_poll = true)]
214    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
215        self.inner.presign(path, args).await
216    }
217
218    #[trace]
219    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
220        self.inner.blocking_create_dir(path, args)
221    }
222
223    #[trace]
224    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
225        self.inner.blocking_read(path, args).map(|(rp, r)| {
226            (
227                rp,
228                FastraceWrapper::new(
229                    Span::enter_with_local_parent(Operation::Read.into_static()),
230                    r,
231                ),
232            )
233        })
234    }
235
236    #[trace]
237    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
238        self.inner.blocking_write(path, args).map(|(rp, r)| {
239            (
240                rp,
241                FastraceWrapper::new(
242                    Span::enter_with_local_parent(Operation::Write.into_static()),
243                    r,
244                ),
245            )
246        })
247    }
248
249    #[trace]
250    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
251        self.inner().blocking_copy(from, to, args)
252    }
253
254    #[trace]
255    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
256        self.inner().blocking_rename(from, to, args)
257    }
258
259    #[trace]
260    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
261        self.inner.blocking_stat(path, args)
262    }
263
264    #[trace]
265    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
266        self.inner.blocking_delete().map(|(rp, r)| {
267            (
268                rp,
269                FastraceWrapper::new(
270                    Span::enter_with_local_parent(Operation::Delete.into_static()),
271                    r,
272                ),
273            )
274        })
275    }
276
277    #[trace]
278    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
279        self.inner.blocking_list(path, args).map(|(rp, it)| {
280            (
281                rp,
282                FastraceWrapper::new(
283                    Span::enter_with_local_parent(Operation::List.into_static()),
284                    it,
285                ),
286            )
287        })
288    }
289}
290
291pub struct FastraceWrapper<R> {
292    span: Span,
293    inner: R,
294}
295
296impl<R> FastraceWrapper<R> {
297    fn new(span: Span, inner: R) -> Self {
298        Self { span, inner }
299    }
300}
301
302impl<R: oio::Read> oio::Read for FastraceWrapper<R> {
303    #[trace(enter_on_poll = true)]
304    async fn read(&mut self) -> Result<Buffer> {
305        self.inner.read().await
306    }
307}
308
309impl<R: oio::BlockingRead> oio::BlockingRead for FastraceWrapper<R> {
310    fn read(&mut self) -> Result<Buffer> {
311        let _g = self.span.set_local_parent();
312        let _span = LocalSpan::enter_with_local_parent(Operation::Read.into_static());
313        self.inner.read()
314    }
315}
316
317impl<R: oio::Write> oio::Write for FastraceWrapper<R> {
318    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
319        let _g = self.span.set_local_parent();
320        let _span = LocalSpan::enter_with_local_parent(Operation::Write.into_static());
321        self.inner.write(bs)
322    }
323
324    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
325        let _g = self.span.set_local_parent();
326        let _span = LocalSpan::enter_with_local_parent(Operation::Write.into_static());
327        self.inner.abort()
328    }
329
330    fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
331        let _g = self.span.set_local_parent();
332        let _span = LocalSpan::enter_with_local_parent(Operation::Write.into_static());
333        self.inner.close()
334    }
335}
336
337impl<R: oio::BlockingWrite> oio::BlockingWrite for FastraceWrapper<R> {
338    fn write(&mut self, bs: Buffer) -> Result<()> {
339        let _g = self.span.set_local_parent();
340        let _span = LocalSpan::enter_with_local_parent(Operation::Write.into_static());
341        self.inner.write(bs)
342    }
343
344    fn close(&mut self) -> Result<Metadata> {
345        let _g = self.span.set_local_parent();
346        let _span = LocalSpan::enter_with_local_parent(Operation::Write.into_static());
347        self.inner.close()
348    }
349}
350
351impl<R: oio::List> oio::List for FastraceWrapper<R> {
352    #[trace(enter_on_poll = true)]
353    async fn next(&mut self) -> Result<Option<oio::Entry>> {
354        self.inner.next().await
355    }
356}
357
358impl<R: oio::BlockingList> oio::BlockingList for FastraceWrapper<R> {
359    fn next(&mut self) -> Result<Option<oio::Entry>> {
360        let _g = self.span.set_local_parent();
361        let _span = LocalSpan::enter_with_local_parent(Operation::List.into_static());
362        self.inner.next()
363    }
364}
365
366impl<R: oio::Delete> oio::Delete for FastraceWrapper<R> {
367    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
368        let _g = self.span.set_local_parent();
369        let _span = LocalSpan::enter_with_local_parent(Operation::Delete.into_static());
370        self.inner.delete(path, args)
371    }
372
373    #[trace(enter_on_poll = true)]
374    async fn flush(&mut self) -> Result<usize> {
375        self.inner.flush().await
376    }
377}
378
379impl<R: oio::BlockingDelete> oio::BlockingDelete for FastraceWrapper<R> {
380    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
381        let _g = self.span.set_local_parent();
382        let _span = LocalSpan::enter_with_local_parent(Operation::Delete.into_static());
383        self.inner.delete(path, args)
384    }
385
386    fn flush(&mut self) -> Result<usize> {
387        let _g = self.span.set_local_parent();
388        let _span = LocalSpan::enter_with_local_parent(Operation::Delete.into_static());
389        self.inner.flush()
390    }
391}