opendal/layers/
oteltrace.rs1use std::future::Future;
19use std::sync::Arc;
20
21use opentelemetry::global;
22use opentelemetry::global::BoxedSpan;
23use opentelemetry::trace::FutureExt as TraceFutureExt;
24use opentelemetry::trace::Span;
25use opentelemetry::trace::TraceContextExt;
26use opentelemetry::trace::Tracer;
27use opentelemetry::Context as TraceContext;
28use opentelemetry::KeyValue;
29
30use crate::raw::*;
31use crate::*;
32
33pub struct OtelTraceLayer;
53
54impl<A: Access> Layer<A> for OtelTraceLayer {
55 type LayeredAccess = OtelTraceAccessor<A>;
56
57 fn layer(&self, inner: A) -> Self::LayeredAccess {
58 OtelTraceAccessor { inner }
59 }
60}
61
62#[derive(Debug)]
63pub struct OtelTraceAccessor<A> {
64 inner: A,
65}
66
67impl<A: Access> LayeredAccess for OtelTraceAccessor<A> {
68 type Inner = A;
69 type Reader = OtelTraceWrapper<A::Reader>;
70 type Writer = OtelTraceWrapper<A::Writer>;
71 type Lister = OtelTraceWrapper<A::Lister>;
72 type Deleter = A::Deleter;
73
74 fn inner(&self) -> &Self::Inner {
75 &self.inner
76 }
77
78 fn info(&self) -> Arc<AccessorInfo> {
79 let tracer = global::tracer("opendal");
80 tracer.in_span("info", |_cx| self.inner.info())
81 }
82
83 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
84 let tracer = global::tracer("opendal");
85 let mut span = tracer.start("create");
86 span.set_attribute(KeyValue::new("path", path.to_string()));
87 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
88 let cx = TraceContext::current_with_span(span);
89 self.inner.create_dir(path, args).with_context(cx).await
90 }
91
92 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
93 let tracer = global::tracer("opendal");
94 let mut span = tracer.start("read");
95 span.set_attribute(KeyValue::new("path", path.to_string()));
96 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
97 self.inner
98 .read(path, args)
99 .await
100 .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
101 }
102
103 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
104 let tracer = global::tracer("opendal");
105 let mut span = tracer.start("write");
106 span.set_attribute(KeyValue::new("path", path.to_string()));
107 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
108 self.inner
109 .write(path, args)
110 .await
111 .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
112 }
113
114 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
115 let tracer = global::tracer("opendal");
116 let mut span = tracer.start("copy");
117 span.set_attribute(KeyValue::new("from", from.to_string()));
118 span.set_attribute(KeyValue::new("to", to.to_string()));
119 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
120 let cx = TraceContext::current_with_span(span);
121 self.inner().copy(from, to, args).with_context(cx).await
122 }
123
124 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
125 let tracer = global::tracer("opendal");
126 let mut span = tracer.start("rename");
127 span.set_attribute(KeyValue::new("from", from.to_string()));
128 span.set_attribute(KeyValue::new("to", to.to_string()));
129 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
130 let cx = TraceContext::current_with_span(span);
131 self.inner().rename(from, to, args).with_context(cx).await
132 }
133
134 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
135 let tracer = global::tracer("opendal");
136 let mut span = tracer.start("stat");
137 span.set_attribute(KeyValue::new("path", path.to_string()));
138 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
139 let cx = TraceContext::current_with_span(span);
140 self.inner().stat(path, args).with_context(cx).await
141 }
142
143 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
144 self.inner().delete().await
145 }
146
147 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
148 let tracer = global::tracer("opendal");
149 let mut span = tracer.start("list");
150 span.set_attribute(KeyValue::new("path", path.to_string()));
151 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
152 self.inner
153 .list(path, args)
154 .await
155 .map(|(rp, s)| (rp, OtelTraceWrapper::new(span, s)))
156 }
157
158 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
159 let tracer = global::tracer("opendal");
160 let mut span = tracer.start("presign");
161 span.set_attribute(KeyValue::new("path", path.to_string()));
162 span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
163 let cx = TraceContext::current_with_span(span);
164 self.inner().presign(path, args).with_context(cx).await
165 }
166}
167
168pub struct OtelTraceWrapper<R> {
169 _span: BoxedSpan,
170 inner: R,
171}
172
173impl<R> OtelTraceWrapper<R> {
174 fn new(_span: BoxedSpan, inner: R) -> Self {
175 Self { _span, inner }
176 }
177}
178
179impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
180 async fn read(&mut self) -> Result<Buffer> {
181 self.inner.read().await
182 }
183}
184
185impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
186 fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
187 self.inner.write(bs)
188 }
189
190 fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
191 self.inner.abort()
192 }
193
194 fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
195 self.inner.close()
196 }
197}
198
199impl<R: oio::List> oio::List for OtelTraceWrapper<R> {
200 async fn next(&mut self) -> Result<Option<oio::Entry>> {
201 self.inner.next().await
202 }
203}