1use std::fmt::Debug;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21
22use futures::Stream;
23use futures::StreamExt;
24use tracing::{span, Level, Span};
25
26use crate::raw::*;
27use crate::*;
28
29pub struct TracingLayer;
141
142impl<A: Access> Layer<A> for TracingLayer {
143 type LayeredAccess = TracingAccessor<A>;
144
145 fn layer(&self, inner: A) -> Self::LayeredAccess {
146 let info = inner.info();
147
148 info.update_http_client(|client| {
150 HttpClient::with(TracingHttpFetcher {
151 inner: client.into_inner(),
152 })
153 });
154
155 TracingAccessor { inner }
156 }
157}
158
159pub struct TracingHttpFetcher {
160 inner: HttpFetcher,
161}
162
163impl HttpFetch for TracingHttpFetcher {
164 async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
165 let span = span!(Level::DEBUG, "http::fetch", ?req);
166
167 let resp = {
168 let _enter = span.enter();
169 self.inner.fetch(req).await?
170 };
171
172 let (parts, body) = resp.into_parts();
173 let body = body.map_inner(|s| Box::new(TracingStream { inner: s, span }));
174 Ok(http::Response::from_parts(parts, body))
175 }
176}
177
178pub struct TracingStream<S> {
179 inner: S,
180 span: Span,
181}
182
183impl<S> Stream for TracingStream<S>
184where
185 S: Stream<Item = Result<Buffer>> + Unpin + 'static,
186{
187 type Item = Result<Buffer>;
188
189 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190 let _enter = self.span.clone().entered();
191 self.inner.poll_next_unpin(cx)
192 }
193}
194
195#[derive(Debug)]
196pub struct TracingAccessor<A> {
197 inner: A,
198}
199
200impl<A: Access> LayeredAccess for TracingAccessor<A> {
201 type Inner = A;
202 type Reader = TracingWrapper<A::Reader>;
203 type Writer = TracingWrapper<A::Writer>;
204 type Lister = TracingWrapper<A::Lister>;
205 type Deleter = TracingWrapper<A::Deleter>;
206 type BlockingReader = TracingWrapper<A::BlockingReader>;
207 type BlockingWriter = TracingWrapper<A::BlockingWriter>;
208 type BlockingLister = TracingWrapper<A::BlockingLister>;
209 type BlockingDeleter = TracingWrapper<A::BlockingDeleter>;
210
211 fn inner(&self) -> &Self::Inner {
212 &self.inner
213 }
214
215 #[tracing::instrument(level = "debug", skip(self))]
216 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
217 self.inner.create_dir(path, args).await
218 }
219
220 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
221 let span = span!(Level::DEBUG, "read", path, ?args);
222
223 let (rp, r) = {
224 let _enter = span.enter();
225 self.inner.read(path, args).await?
226 };
227
228 Ok((rp, TracingWrapper::new(span, r)))
229 }
230
231 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
232 let span = span!(Level::DEBUG, "write", path, ?args);
233
234 let (rp, r) = {
235 let _enter = span.enter();
236 self.inner.write(path, args).await?
237 };
238
239 Ok((rp, TracingWrapper::new(span, r)))
240 }
241
242 #[tracing::instrument(level = "debug", skip(self))]
243 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
244 self.inner().copy(from, to, args).await
245 }
246
247 #[tracing::instrument(level = "debug", skip(self))]
248 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
249 self.inner().rename(from, to, args).await
250 }
251
252 #[tracing::instrument(level = "debug", skip(self))]
253 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
254 self.inner.stat(path, args).await
255 }
256
257 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
258 let span = span!(Level::DEBUG, "delete");
259
260 let (rp, r) = {
261 let _enter = span.enter();
262 self.inner.delete().await?
263 };
264
265 Ok((rp, TracingWrapper::new(span, r)))
266 }
267
268 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
269 let span = span!(Level::DEBUG, "list", path, ?args);
270
271 let (rp, r) = {
272 let _enter = span.enter();
273 self.inner.list(path, args).await?
274 };
275
276 Ok((rp, TracingWrapper::new(span, r)))
277 }
278
279 #[tracing::instrument(level = "debug", skip(self))]
280 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
281 self.inner.presign(path, args).await
282 }
283
284 #[tracing::instrument(level = "debug", skip(self))]
285 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
286 self.inner.blocking_create_dir(path, args)
287 }
288
289 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
290 let span = span!(Level::DEBUG, "read", path, ?args);
291
292 let (rp, r) = {
293 let _enter = span.enter();
294 self.inner.blocking_read(path, args)?
295 };
296
297 Ok((rp, TracingWrapper::new(span, r)))
298 }
299
300 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
301 let span = span!(Level::DEBUG, "write", path, ?args);
302
303 let (rp, r) = {
304 let _enter = span.enter();
305 self.inner.blocking_write(path, args)?
306 };
307
308 Ok((rp, TracingWrapper::new(span, r)))
309 }
310
311 #[tracing::instrument(level = "debug", skip(self))]
312 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
313 self.inner().blocking_copy(from, to, args)
314 }
315
316 #[tracing::instrument(level = "debug", skip(self))]
317 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
318 self.inner().blocking_rename(from, to, args)
319 }
320
321 #[tracing::instrument(level = "debug", skip(self))]
322 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
323 self.inner.blocking_stat(path, args)
324 }
325
326 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
327 let span = span!(Level::DEBUG, "delete");
328
329 let (rp, r) = {
330 let _enter = span.enter();
331 self.inner.blocking_delete()?
332 };
333
334 Ok((rp, TracingWrapper::new(span, r)))
335 }
336
337 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
338 let span = span!(Level::DEBUG, "list", path, ?args);
339
340 let (rp, r) = {
341 let _enter = span.enter();
342 self.inner.blocking_list(path, args)?
343 };
344
345 Ok((rp, TracingWrapper::new(span, r)))
346 }
347}
348
349pub struct TracingWrapper<R> {
350 span: Span,
351 inner: R,
352}
353
354impl<R> TracingWrapper<R> {
355 fn new(span: Span, inner: R) -> Self {
356 Self { span, inner }
357 }
358}
359
360impl<R: oio::Read> oio::Read for TracingWrapper<R> {
361 async fn read(&mut self) -> Result<Buffer> {
362 let _enter = self.span.enter();
363
364 self.inner.read().await
365 }
366}
367
368impl<R: oio::BlockingRead> oio::BlockingRead for TracingWrapper<R> {
369 fn read(&mut self) -> Result<Buffer> {
370 let _enter = self.span.enter();
371
372 self.inner.read()
373 }
374}
375
376impl<R: oio::Write> oio::Write for TracingWrapper<R> {
377 async fn write(&mut self, bs: Buffer) -> Result<()> {
378 let _enter = self.span.enter();
379
380 self.inner.write(bs).await
381 }
382
383 async fn abort(&mut self) -> Result<()> {
384 let _enter = self.span.enter();
385
386 self.inner.abort().await
387 }
388
389 async fn close(&mut self) -> Result<Metadata> {
390 let _enter = self.span.enter();
391
392 self.inner.close().await
393 }
394}
395
396impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> {
397 fn write(&mut self, bs: Buffer) -> Result<()> {
398 let _enter = self.span.enter();
399
400 self.inner.write(bs)
401 }
402
403 fn close(&mut self) -> Result<Metadata> {
404 let _enter = self.span.enter();
405
406 self.inner.close()
407 }
408}
409
410impl<R: oio::List> oio::List for TracingWrapper<R> {
411 async fn next(&mut self) -> Result<Option<oio::Entry>> {
412 let _enter = self.span.enter();
413
414 self.inner.next().await
415 }
416}
417
418impl<R: oio::BlockingList> oio::BlockingList for TracingWrapper<R> {
419 fn next(&mut self) -> Result<Option<oio::Entry>> {
420 let _enter = self.span.enter();
421
422 self.inner.next()
423 }
424}
425
426impl<R: oio::Delete> oio::Delete for TracingWrapper<R> {
427 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
428 let _enter = self.span.enter();
429
430 self.inner.delete(path, args)
431 }
432
433 async fn flush(&mut self) -> Result<usize> {
434 let _enter = self.span.enter();
435
436 self.inner.flush().await
437 }
438}
439
440impl<R: oio::BlockingDelete> oio::BlockingDelete for TracingWrapper<R> {
441 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
442 let _enter = self.span.enter();
443
444 self.inner.delete(path, args)
445 }
446
447 fn flush(&mut self) -> Result<usize> {
448 let _enter = self.span.enter();
449
450 self.inner.flush()
451 }
452}