opendal/layers/
dtrace.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::CString;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21
22use bytes::Buf;
23use probe::probe_lazy;
24
25use crate::raw::Access;
26use crate::raw::*;
27use crate::*;
28
29/// Support User Statically-Defined Tracing(aka USDT) on Linux
30///
31/// This layer is an experimental feature, it will be enabled by `features = ["layers-dtrace"]` in Cargo.toml.
32///
33/// For now we have following probes:
34///
35/// ### For Accessor
36///
37/// 1. ${operation}_start, arguments: path
38///     1. create_dir
39///     2. read
40///     3. write
41///     4. stat
42///     5. delete
43///     6. list
44///     7. presign
45///     8. blocking_create_dir
46///     9. blocking_read
47///     10. blocking_write
48///     11. blocking_stat
49///     12. blocking_delete
50///     13. blocking_list
51/// 2. ${operation}_end, arguments: path
52///     1. create_dir
53///     2. read
54///     3. write
55///     4. stat
56///     5. delete
57///     6. list
58///     7. presign
59///     8. blocking_create_dir
60///     9. blocking_read
61///     10. blocking_write
62///     11. blocking_stat
63///     12. blocking_delete
64///     13. blocking_list
65///
66/// ### For Reader
67///
68/// 1. reader_read_start, arguments: path
69/// 2. reader_read_ok, arguments: path, length
70/// 3. reader_read_error, arguments: path
71///
72/// ### For BlockingReader
73///
74/// 1. blocking_reader_read_start, arguments: path
75/// 2. blocking_reader_read_ok, arguments: path, length
76/// 3. blocking_reader_read_error, arguments: path
77///
78/// ### For Writer
79///
80/// 1. writer_write_start, arguments: path
81/// 2. writer_write_ok, arguments: path, length
82/// 3. writer_write_error, arguments: path
83/// 4. writer_abort_start, arguments: path
84/// 5. writer_abort_ok, arguments: path
85/// 6. writer_abort_error, arguments: path
86/// 7. writer_close_start, arguments: path
87/// 8. writer_close_ok, arguments: path
88/// 9. writer_close_error, arguments: path
89///
90/// ### For BlockingWriter
91///
92/// 1. blocking_writer_write_start, arguments: path
93/// 2. blocking_writer_write_ok, arguments: path, length
94/// 3. blocking_writer_write_error, arguments: path
95/// 4. blocking_writer_close_start, arguments: path
96/// 5. blocking_writer_close_ok, arguments: path
97/// 6. blocking_writer_close_error, arguments: path
98///
99/// Example:
100///
101/// ```no_run
102/// # use opendal::layers::DtraceLayer;
103/// # use opendal::services;
104/// # use opendal::Operator;
105/// # use opendal::Result;
106///
107/// # #[tokio::main]
108/// # async fn main() -> Result<()> {
109/// // `Accessor` provides the low level APIs, we will use `Operator` normally.
110/// let op: Operator = Operator::new(services::Fs::default().root("/tmp"))?
111///     .layer(DtraceLayer::default())
112///     .finish();
113///
114/// let path = "/tmp/test.txt";
115/// for _ in 1..100000 {
116///     let bs = vec![0; 64 * 1024 * 1024];
117///     op.write(path, bs).await?;
118///     op.read(path).await?;
119/// }
120/// Ok(())
121/// # }
122/// ```
123///
124/// Then you can use `readelf -n target/debug/examples/dtrace` to see the probes:
125///
126/// ```text
127/// Displaying notes found in: .note.stapsdt
128///   Owner                Data size        Description
129///   stapsdt              0x00000039       NT_STAPSDT (SystemTap probe descriptors)
130///     Provider: opendal
131///     Name: create_dir_start
132///     Location: 0x00000000000f8f05, Base: 0x0000000000000000, Semaphore: 0x00000000003649f8
133///     Arguments: -8@%rax
134///   stapsdt              0x00000037       NT_STAPSDT (SystemTap probe descriptors)
135///     Provider: opendal
136///     Name: create_dir_end
137///     Location: 0x00000000000f9284, Base: 0x0000000000000000, Semaphore: 0x00000000003649fa
138///     Arguments: -8@%rax
139///   stapsdt              0x0000003c       NT_STAPSDT (SystemTap probe descriptors)
140///     Provider: opendal
141///     Name: blocking_list_start
142///     Location: 0x00000000000f9487, Base: 0x0000000000000000, Semaphore: 0x0000000000364a28
143///     Arguments: -8@%rax
144///   stapsdt              0x0000003a       NT_STAPSDT (SystemTap probe descriptors)
145///     Provider: opendal
146///     Name: blocking_list_end
147///     Location: 0x00000000000f9546, Base: 0x0000000000000000, Semaphore: 0x0000000000364a2a
148///     Arguments: -8@%rax
149///   stapsdt              0x0000003c       NT_STAPSDT (SystemTap probe descriptors)
150/// ```
151#[derive(Default, Debug, Clone)]
152pub struct DtraceLayer {}
153
154impl<A: Access> Layer<A> for DtraceLayer {
155    type LayeredAccess = DTraceAccessor<A>;
156    fn layer(&self, inner: A) -> Self::LayeredAccess {
157        DTraceAccessor { inner }
158    }
159}
160
161#[derive(Clone)]
162pub struct DTraceAccessor<A: Access> {
163    inner: A,
164}
165
166impl<A: Access> Debug for DTraceAccessor<A> {
167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168        f.debug_struct("DTraceAccessor")
169            .field("inner", &self.inner)
170            .finish_non_exhaustive()
171    }
172}
173
174impl<A: Access> LayeredAccess for DTraceAccessor<A> {
175    type Inner = A;
176    type Reader = DtraceLayerWrapper<A::Reader>;
177    type BlockingReader = DtraceLayerWrapper<A::BlockingReader>;
178    type Writer = DtraceLayerWrapper<A::Writer>;
179    type BlockingWriter = DtraceLayerWrapper<A::BlockingWriter>;
180    type Lister = A::Lister;
181    type BlockingLister = A::BlockingLister;
182    type Deleter = A::Deleter;
183    type BlockingDeleter = A::BlockingDeleter;
184
185    fn inner(&self) -> &Self::Inner {
186        &self.inner
187    }
188
189    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
190        let c_path = CString::new(path).unwrap();
191        probe_lazy!(opendal, create_dir_start, c_path.as_ptr());
192        let result = self.inner.create_dir(path, args).await;
193        probe_lazy!(opendal, create_dir_end, c_path.as_ptr());
194        result
195    }
196
197    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
198        let c_path = CString::new(path).unwrap();
199        probe_lazy!(opendal, read_start, c_path.as_ptr());
200        let result = self
201            .inner
202            .read(path, args)
203            .await
204            .map(|(rp, r)| (rp, DtraceLayerWrapper::new(r, &path.to_string())));
205        probe_lazy!(opendal, read_end, c_path.as_ptr());
206        result
207    }
208
209    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
210        let c_path = CString::new(path).unwrap();
211        probe_lazy!(opendal, write_start, c_path.as_ptr());
212        let result = self
213            .inner
214            .write(path, args)
215            .await
216            .map(|(rp, r)| (rp, DtraceLayerWrapper::new(r, &path.to_string())));
217
218        probe_lazy!(opendal, write_end, c_path.as_ptr());
219        result
220    }
221
222    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
223        let c_path = CString::new(path).unwrap();
224        probe_lazy!(opendal, stat_start, c_path.as_ptr());
225        let result = self.inner.stat(path, args).await;
226        probe_lazy!(opendal, stat_end, c_path.as_ptr());
227        result
228    }
229
230    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
231        self.inner.delete().await
232    }
233
234    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
235        let c_path = CString::new(path).unwrap();
236        probe_lazy!(opendal, list_start, c_path.as_ptr());
237        let result = self.inner.list(path, args).await;
238        probe_lazy!(opendal, list_end, c_path.as_ptr());
239        result
240    }
241
242    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
243        let c_path = CString::new(path).unwrap();
244        probe_lazy!(opendal, presign_start, c_path.as_ptr());
245        let result = self.inner.presign(path, args).await;
246        probe_lazy!(opendal, presign_end, c_path.as_ptr());
247        result
248    }
249
250    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
251        let c_path = CString::new(path).unwrap();
252        probe_lazy!(opendal, blocking_create_dir_start, c_path.as_ptr());
253        let result = self.inner.blocking_create_dir(path, args);
254        probe_lazy!(opendal, blocking_create_dir_end, c_path.as_ptr());
255        result
256    }
257
258    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
259        let c_path = CString::new(path).unwrap();
260        probe_lazy!(opendal, blocking_read_start, c_path.as_ptr());
261        let result = self
262            .inner
263            .blocking_read(path, args)
264            .map(|(rp, r)| (rp, DtraceLayerWrapper::new(r, &path.to_string())));
265        probe_lazy!(opendal, blocking_read_end, c_path.as_ptr());
266        result
267    }
268
269    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
270        let c_path = CString::new(path).unwrap();
271        probe_lazy!(opendal, blocking_write_start, c_path.as_ptr());
272        let result = self
273            .inner
274            .blocking_write(path, args)
275            .map(|(rp, r)| (rp, DtraceLayerWrapper::new(r, &path.to_string())));
276        probe_lazy!(opendal, blocking_write_end, c_path.as_ptr());
277        result
278    }
279
280    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
281        let c_path = CString::new(path).unwrap();
282        probe_lazy!(opendal, blocking_stat_start, c_path.as_ptr());
283        let result = self.inner.blocking_stat(path, args);
284        probe_lazy!(opendal, blocking_stat_end, c_path.as_ptr());
285        result
286    }
287
288    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
289        self.inner.blocking_delete()
290    }
291
292    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
293        let c_path = CString::new(path).unwrap();
294        probe_lazy!(opendal, blocking_list_start, c_path.as_ptr());
295        let result = self.inner.blocking_list(path, args);
296        probe_lazy!(opendal, blocking_list_end, c_path.as_ptr());
297        result
298    }
299}
300
301pub struct DtraceLayerWrapper<R> {
302    inner: R,
303    path: String,
304}
305
306impl<R> DtraceLayerWrapper<R> {
307    pub fn new(inner: R, path: &String) -> Self {
308        Self {
309            inner,
310            path: path.to_string(),
311        }
312    }
313}
314
315impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
316    async fn read(&mut self) -> Result<Buffer> {
317        let c_path = CString::new(self.path.clone()).unwrap();
318        probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
319        match self.inner.read().await {
320            Ok(bs) => {
321                probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.remaining());
322                Ok(bs)
323            }
324            Err(e) => {
325                probe_lazy!(opendal, reader_read_error, c_path.as_ptr());
326                Err(e)
327            }
328        }
329    }
330}
331
332impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
333    fn read(&mut self) -> Result<Buffer> {
334        let c_path = CString::new(self.path.clone()).unwrap();
335        probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr());
336        self.inner
337            .read()
338            .inspect(|bs| {
339                probe_lazy!(
340                    opendal,
341                    blocking_reader_read_ok,
342                    c_path.as_ptr(),
343                    bs.remaining()
344                );
345            })
346            .inspect_err(|_| {
347                probe_lazy!(opendal, blocking_reader_read_error, c_path.as_ptr());
348            })
349    }
350}
351
352impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
353    async fn write(&mut self, bs: Buffer) -> Result<()> {
354        let c_path = CString::new(self.path.clone()).unwrap();
355        probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
356        self.inner
357            .write(bs)
358            .await
359            .map(|_| {
360                probe_lazy!(opendal, writer_write_ok, c_path.as_ptr());
361            })
362            .inspect_err(|_| {
363                probe_lazy!(opendal, writer_write_error, c_path.as_ptr());
364            })
365    }
366
367    async fn abort(&mut self) -> Result<()> {
368        let c_path = CString::new(self.path.clone()).unwrap();
369        probe_lazy!(opendal, writer_poll_abort_start, c_path.as_ptr());
370        self.inner
371            .abort()
372            .await
373            .map(|_| {
374                probe_lazy!(opendal, writer_poll_abort_ok, c_path.as_ptr());
375            })
376            .inspect_err(|_| {
377                probe_lazy!(opendal, writer_poll_abort_error, c_path.as_ptr());
378            })
379    }
380
381    async fn close(&mut self) -> Result<Metadata> {
382        let c_path = CString::new(self.path.clone()).unwrap();
383        probe_lazy!(opendal, writer_close_start, c_path.as_ptr());
384        self.inner
385            .close()
386            .await
387            .inspect(|_| {
388                probe_lazy!(opendal, writer_close_ok, c_path.as_ptr());
389            })
390            .inspect_err(|_| {
391                probe_lazy!(opendal, writer_close_error, c_path.as_ptr());
392            })
393    }
394}
395
396impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
397    fn write(&mut self, bs: Buffer) -> Result<()> {
398        let c_path = CString::new(self.path.clone()).unwrap();
399        probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
400        self.inner
401            .write(bs)
402            .map(|_| {
403                probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr());
404            })
405            .inspect_err(|_| {
406                probe_lazy!(opendal, blocking_writer_write_error, c_path.as_ptr());
407            })
408    }
409
410    fn close(&mut self) -> Result<Metadata> {
411        let c_path = CString::new(self.path.clone()).unwrap();
412        probe_lazy!(opendal, blocking_writer_close_start, c_path.as_ptr());
413        self.inner
414            .close()
415            .inspect(|_| {
416                probe_lazy!(opendal, blocking_writer_close_ok, c_path.as_ptr());
417            })
418            .inspect_err(|_| {
419                probe_lazy!(opendal, blocking_writer_close_error, c_path.as_ptr());
420            })
421    }
422}