opendal/layers/
oteltrace.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::sync::Arc;
20
21use opentelemetry::global;
22use opentelemetry::global::BoxedSpan;
23use opentelemetry::trace::FutureExt as TraceFutureExt;
24use opentelemetry::trace::Span;
25use opentelemetry::trace::TraceContextExt;
26use opentelemetry::trace::Tracer;
27use opentelemetry::Context as TraceContext;
28use opentelemetry::KeyValue;
29
30use crate::raw::*;
31use crate::*;
32
33/// Add [opentelemetry::trace](https://docs.rs/opentelemetry/latest/opentelemetry/trace/index.html) for every operation.
34///
35/// Examples
36///
37/// ## Basic Setup
38///
39/// ```no_run
40/// # use opendal::layers::OtelTraceLayer;
41/// # use opendal::services;
42/// # use opendal::Operator;
43/// # use opendal::Result;
44///
45/// # fn main() -> Result<()> {
46/// let _ = Operator::new(services::Memory::default())?
47///     .layer(OtelTraceLayer)
48///     .finish();
49/// Ok(())
50/// # }
51/// ```
52pub struct OtelTraceLayer;
53
54impl<A: Access> Layer<A> for OtelTraceLayer {
55    type LayeredAccess = OtelTraceAccessor<A>;
56
57    fn layer(&self, inner: A) -> Self::LayeredAccess {
58        OtelTraceAccessor { inner }
59    }
60}
61
62#[derive(Debug)]
63pub struct OtelTraceAccessor<A> {
64    inner: A,
65}
66
67impl<A: Access> LayeredAccess for OtelTraceAccessor<A> {
68    type Inner = A;
69    type Reader = OtelTraceWrapper<A::Reader>;
70    type BlockingReader = OtelTraceWrapper<A::BlockingReader>;
71    type Writer = OtelTraceWrapper<A::Writer>;
72    type BlockingWriter = OtelTraceWrapper<A::BlockingWriter>;
73    type Lister = OtelTraceWrapper<A::Lister>;
74    type BlockingLister = OtelTraceWrapper<A::BlockingLister>;
75    type Deleter = A::Deleter;
76    type BlockingDeleter = A::BlockingDeleter;
77
78    fn inner(&self) -> &Self::Inner {
79        &self.inner
80    }
81
82    fn info(&self) -> Arc<AccessorInfo> {
83        let tracer = global::tracer("opendal");
84        tracer.in_span("info", |_cx| self.inner.info())
85    }
86
87    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
88        let tracer = global::tracer("opendal");
89        let mut span = tracer.start("create");
90        span.set_attribute(KeyValue::new("path", path.to_string()));
91        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
92        let cx = TraceContext::current_with_span(span);
93        self.inner.create_dir(path, args).with_context(cx).await
94    }
95
96    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
97        let tracer = global::tracer("opendal");
98        let mut span = tracer.start("read");
99        span.set_attribute(KeyValue::new("path", path.to_string()));
100        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
101        self.inner
102            .read(path, args)
103            .await
104            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
105    }
106
107    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
108        let tracer = global::tracer("opendal");
109        let mut span = tracer.start("write");
110        span.set_attribute(KeyValue::new("path", path.to_string()));
111        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
112        self.inner
113            .write(path, args)
114            .await
115            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
116    }
117
118    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
119        let tracer = global::tracer("opendal");
120        let mut span = tracer.start("copy");
121        span.set_attribute(KeyValue::new("from", from.to_string()));
122        span.set_attribute(KeyValue::new("to", to.to_string()));
123        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
124        let cx = TraceContext::current_with_span(span);
125        self.inner().copy(from, to, args).with_context(cx).await
126    }
127
128    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
129        let tracer = global::tracer("opendal");
130        let mut span = tracer.start("rename");
131        span.set_attribute(KeyValue::new("from", from.to_string()));
132        span.set_attribute(KeyValue::new("to", to.to_string()));
133        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
134        let cx = TraceContext::current_with_span(span);
135        self.inner().rename(from, to, args).with_context(cx).await
136    }
137
138    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
139        let tracer = global::tracer("opendal");
140        let mut span = tracer.start("stat");
141        span.set_attribute(KeyValue::new("path", path.to_string()));
142        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
143        let cx = TraceContext::current_with_span(span);
144        self.inner().stat(path, args).with_context(cx).await
145    }
146
147    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
148        self.inner().delete().await
149    }
150
151    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
152        let tracer = global::tracer("opendal");
153        let mut span = tracer.start("list");
154        span.set_attribute(KeyValue::new("path", path.to_string()));
155        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
156        self.inner
157            .list(path, args)
158            .await
159            .map(|(rp, s)| (rp, OtelTraceWrapper::new(span, s)))
160    }
161
162    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
163        let tracer = global::tracer("opendal");
164        let mut span = tracer.start("presign");
165        span.set_attribute(KeyValue::new("path", path.to_string()));
166        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
167        let cx = TraceContext::current_with_span(span);
168        self.inner().presign(path, args).with_context(cx).await
169    }
170
171    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
172        let tracer = global::tracer("opendal");
173        tracer.in_span("blocking_create_dir", |cx| {
174            let span = cx.span(); // let mut span = cx.();
175            span.set_attribute(KeyValue::new("path", path.to_string()));
176            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
177            self.inner().blocking_create_dir(path, args)
178        })
179    }
180
181    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
182        let tracer = global::tracer("opendal");
183        let mut span = tracer.start("blocking_read");
184        span.set_attribute(KeyValue::new("path", path.to_string()));
185        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
186        self.inner
187            .blocking_read(path, args)
188            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
189    }
190
191    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
192        let tracer = global::tracer("opendal");
193        let mut span = tracer.start("blocking_write");
194        span.set_attribute(KeyValue::new("path", path.to_string()));
195        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
196        self.inner
197            .blocking_write(path, args)
198            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
199    }
200
201    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
202        let tracer = global::tracer("opendal");
203        tracer.in_span("blocking_copy", |cx| {
204            let span = cx.span();
205            span.set_attribute(KeyValue::new("from", from.to_string()));
206            span.set_attribute(KeyValue::new("to", to.to_string()));
207            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
208            self.inner().blocking_copy(from, to, args)
209        })
210    }
211
212    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
213        let tracer = global::tracer("opendal");
214        tracer.in_span("blocking_rename", |cx| {
215            let span = cx.span();
216            span.set_attribute(KeyValue::new("from", from.to_string()));
217            span.set_attribute(KeyValue::new("to", to.to_string()));
218            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
219            self.inner().blocking_rename(from, to, args)
220        })
221    }
222
223    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
224        let tracer = global::tracer("opendal");
225        tracer.in_span("blocking_stat", |cx| {
226            let span = cx.span();
227            span.set_attribute(KeyValue::new("path", path.to_string()));
228            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
229            self.inner().blocking_stat(path, args)
230        })
231    }
232
233    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
234        self.inner().blocking_delete()
235    }
236
237    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
238        let tracer = global::tracer("opendal");
239        let mut span = tracer.start("blocking_list");
240        span.set_attribute(KeyValue::new("path", path.to_string()));
241        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
242        self.inner
243            .blocking_list(path, args)
244            .map(|(rp, it)| (rp, OtelTraceWrapper::new(span, it)))
245    }
246}
247
248pub struct OtelTraceWrapper<R> {
249    _span: BoxedSpan,
250    inner: R,
251}
252
253impl<R> OtelTraceWrapper<R> {
254    fn new(_span: BoxedSpan, inner: R) -> Self {
255        Self { _span, inner }
256    }
257}
258
259impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
260    async fn read(&mut self) -> Result<Buffer> {
261        self.inner.read().await
262    }
263}
264
265impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
266    fn read(&mut self) -> Result<Buffer> {
267        self.inner.read()
268    }
269}
270
271impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
272    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
273        self.inner.write(bs)
274    }
275
276    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
277        self.inner.abort()
278    }
279
280    fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
281        self.inner.close()
282    }
283}
284
285impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
286    fn write(&mut self, bs: Buffer) -> Result<()> {
287        self.inner.write(bs)
288    }
289
290    fn close(&mut self) -> Result<Metadata> {
291        self.inner.close()
292    }
293}
294
295impl<R: oio::List> oio::List for OtelTraceWrapper<R> {
296    async fn next(&mut self) -> Result<Option<oio::Entry>> {
297        self.inner.next().await
298    }
299}
300
301impl<R: oio::BlockingList> oio::BlockingList for OtelTraceWrapper<R> {
302    fn next(&mut self) -> Result<Option<oio::Entry>> {
303        self.inner.next()
304    }
305}