1use std::future::Future;
19use std::sync::Arc;
20
21use opentelemetry::global;
22use opentelemetry::global::BoxedSpan;
23use opentelemetry::trace::FutureExt as TraceFutureExt;
24use opentelemetry::trace::Span;
25use opentelemetry::trace::TraceContextExt;
26use opentelemetry::trace::Tracer;
27use opentelemetry::Context as TraceContext;
28use opentelemetry::KeyValue;
29
30use crate::raw::*;
31use crate::*;
32
33pub struct OtelTraceLayer;
53
54impl<A: Access> Layer<A> for OtelTraceLayer {
55 type LayeredAccess = OtelTraceAccessor<A>;
56
57 fn layer(&self, inner: A) -> Self::LayeredAccess {
58 OtelTraceAccessor { inner }
59 }
60}
61
62#[derive(Debug)]
63pub struct OtelTraceAccessor<A> {
64 inner: A,
65}
66
67impl<A: Access> LayeredAccess for OtelTraceAccessor<A> {
68 type Inner = A;
69 type Reader = OtelTraceWrapper<A::Reader>;
70 type BlockingReader = OtelTraceWrapper<A::BlockingReader>;
71 type Writer = OtelTraceWrapper<A::Writer>;
72 type BlockingWriter = OtelTraceWrapper<A::BlockingWriter>;
73 type Lister = OtelTraceWrapper<A::Lister>;
74 type BlockingLister = OtelTraceWrapper<A::BlockingLister>;
75 type Deleter = A::Deleter;
76 type BlockingDeleter = A::BlockingDeleter;
77
78 fn inner(&self) -> &Self::Inner {
79 &self.inner
80 }
81
82 fn info(&self) -> Arc<AccessorInfo> {
83 let tracer = global::tracer("opendal");
84 tracer.in_span("info", |_cx| self.inner.info())
85 }
86
87 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
88 let tracer = global::tracer("opendal");
89 let mut span = tracer.start("create");
90 span.set_attribute(KeyValue::new("path", path.to_string()));
91 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
92 let cx = TraceContext::current_with_span(span);
93 self.inner.create_dir(path, args).with_context(cx).await
94 }
95
96 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
97 let tracer = global::tracer("opendal");
98 let mut span = tracer.start("read");
99 span.set_attribute(KeyValue::new("path", path.to_string()));
100 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
101 self.inner
102 .read(path, args)
103 .await
104 .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
105 }
106
107 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
108 let tracer = global::tracer("opendal");
109 let mut span = tracer.start("write");
110 span.set_attribute(KeyValue::new("path", path.to_string()));
111 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
112 self.inner
113 .write(path, args)
114 .await
115 .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
116 }
117
118 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
119 let tracer = global::tracer("opendal");
120 let mut span = tracer.start("copy");
121 span.set_attribute(KeyValue::new("from", from.to_string()));
122 span.set_attribute(KeyValue::new("to", to.to_string()));
123 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
124 let cx = TraceContext::current_with_span(span);
125 self.inner().copy(from, to, args).with_context(cx).await
126 }
127
128 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
129 let tracer = global::tracer("opendal");
130 let mut span = tracer.start("rename");
131 span.set_attribute(KeyValue::new("from", from.to_string()));
132 span.set_attribute(KeyValue::new("to", to.to_string()));
133 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
134 let cx = TraceContext::current_with_span(span);
135 self.inner().rename(from, to, args).with_context(cx).await
136 }
137
138 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
139 let tracer = global::tracer("opendal");
140 let mut span = tracer.start("stat");
141 span.set_attribute(KeyValue::new("path", path.to_string()));
142 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
143 let cx = TraceContext::current_with_span(span);
144 self.inner().stat(path, args).with_context(cx).await
145 }
146
147 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
148 self.inner().delete().await
149 }
150
151 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
152 let tracer = global::tracer("opendal");
153 let mut span = tracer.start("list");
154 span.set_attribute(KeyValue::new("path", path.to_string()));
155 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
156 self.inner
157 .list(path, args)
158 .await
159 .map(|(rp, s)| (rp, OtelTraceWrapper::new(span, s)))
160 }
161
162 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
163 let tracer = global::tracer("opendal");
164 let mut span = tracer.start("presign");
165 span.set_attribute(KeyValue::new("path", path.to_string()));
166 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
167 let cx = TraceContext::current_with_span(span);
168 self.inner().presign(path, args).with_context(cx).await
169 }
170
171 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
172 let tracer = global::tracer("opendal");
173 tracer.in_span("blocking_create_dir", |cx| {
174 let span = cx.span(); span.set_attribute(KeyValue::new("path", path.to_string()));
176 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
177 self.inner().blocking_create_dir(path, args)
178 })
179 }
180
181 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
182 let tracer = global::tracer("opendal");
183 let mut span = tracer.start("blocking_read");
184 span.set_attribute(KeyValue::new("path", path.to_string()));
185 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
186 self.inner
187 .blocking_read(path, args)
188 .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
189 }
190
191 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
192 let tracer = global::tracer("opendal");
193 let mut span = tracer.start("blocking_write");
194 span.set_attribute(KeyValue::new("path", path.to_string()));
195 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
196 self.inner
197 .blocking_write(path, args)
198 .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
199 }
200
201 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
202 let tracer = global::tracer("opendal");
203 tracer.in_span("blocking_copy", |cx| {
204 let span = cx.span();
205 span.set_attribute(KeyValue::new("from", from.to_string()));
206 span.set_attribute(KeyValue::new("to", to.to_string()));
207 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
208 self.inner().blocking_copy(from, to, args)
209 })
210 }
211
212 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
213 let tracer = global::tracer("opendal");
214 tracer.in_span("blocking_rename", |cx| {
215 let span = cx.span();
216 span.set_attribute(KeyValue::new("from", from.to_string()));
217 span.set_attribute(KeyValue::new("to", to.to_string()));
218 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
219 self.inner().blocking_rename(from, to, args)
220 })
221 }
222
223 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
224 let tracer = global::tracer("opendal");
225 tracer.in_span("blocking_stat", |cx| {
226 let span = cx.span();
227 span.set_attribute(KeyValue::new("path", path.to_string()));
228 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
229 self.inner().blocking_stat(path, args)
230 })
231 }
232
233 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
234 self.inner().blocking_delete()
235 }
236
237 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
238 let tracer = global::tracer("opendal");
239 let mut span = tracer.start("blocking_list");
240 span.set_attribute(KeyValue::new("path", path.to_string()));
241 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
242 self.inner
243 .blocking_list(path, args)
244 .map(|(rp, it)| (rp, OtelTraceWrapper::new(span, it)))
245 }
246}
247
248pub struct OtelTraceWrapper<R> {
249 _span: BoxedSpan,
250 inner: R,
251}
252
253impl<R> OtelTraceWrapper<R> {
254 fn new(_span: BoxedSpan, inner: R) -> Self {
255 Self { _span, inner }
256 }
257}
258
259impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
260 async fn read(&mut self) -> Result<Buffer> {
261 self.inner.read().await
262 }
263}
264
265impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
266 fn read(&mut self) -> Result<Buffer> {
267 self.inner.read()
268 }
269}
270
271impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
272 fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
273 self.inner.write(bs)
274 }
275
276 fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
277 self.inner.abort()
278 }
279
280 fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
281 self.inner.close()
282 }
283}
284
285impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
286 fn write(&mut self, bs: Buffer) -> Result<()> {
287 self.inner.write(bs)
288 }
289
290 fn close(&mut self) -> Result<Metadata> {
291 self.inner.close()
292 }
293}
294
295impl<R: oio::List> oio::List for OtelTraceWrapper<R> {
296 async fn next(&mut self) -> Result<Option<oio::Entry>> {
297 self.inner.next().await
298 }
299}
300
301impl<R: oio::BlockingList> oio::BlockingList for OtelTraceWrapper<R> {
302 fn next(&mut self) -> Result<Option<oio::Entry>> {
303 self.inner.next()
304 }
305}