opendal/layers/
await_tree.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use await_tree::InstrumentAwait;
19use futures::Future;
20
21use crate::raw::*;
22use crate::*;
23
24/// Add an Instrument await-tree for actor-based applications to the underlying services.
25///
26/// # AwaitTree
27///
28/// await-tree allows developers to dump this execution tree at runtime,
29/// with the span of each Future annotated by instrument_await.
30/// Read more about [await-tree](https://docs.rs/await-tree/latest/await_tree/)
31///
32/// # Examples
33///
34/// ```no_run
35/// # use opendal::layers::AwaitTreeLayer;
36/// # use opendal::services;
37/// # use opendal::Operator;
38/// # use opendal::Result;
39/// # use opendal::Scheme;
40///
41/// # fn main() -> Result<()> {
42/// let _ = Operator::new(services::Memory::default())?
43///     .layer(AwaitTreeLayer::new())
44///     .finish();
45/// Ok(())
46/// # }
47/// ```
48#[derive(Clone, Default)]
49pub struct AwaitTreeLayer {}
50
51impl AwaitTreeLayer {
52    /// Create a new `AwaitTreeLayer`.
53    pub fn new() -> Self {
54        Self {}
55    }
56}
57
58impl<A: Access> Layer<A> for AwaitTreeLayer {
59    type LayeredAccess = AwaitTreeAccessor<A>;
60
61    fn layer(&self, accessor: A) -> Self::LayeredAccess {
62        AwaitTreeAccessor { inner: accessor }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct AwaitTreeAccessor<A: Access> {
68    inner: A,
69}
70
71impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
72    type Inner = A;
73    type Reader = AwaitTreeWrapper<A::Reader>;
74    type BlockingReader = AwaitTreeWrapper<A::BlockingReader>;
75    type Writer = AwaitTreeWrapper<A::Writer>;
76    type BlockingWriter = AwaitTreeWrapper<A::BlockingWriter>;
77    type Lister = AwaitTreeWrapper<A::Lister>;
78    type BlockingLister = AwaitTreeWrapper<A::BlockingLister>;
79    type Deleter = AwaitTreeWrapper<A::Deleter>;
80    type BlockingDeleter = AwaitTreeWrapper<A::BlockingDeleter>;
81
82    fn inner(&self) -> &Self::Inner {
83        &self.inner
84    }
85
86    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
87        self.inner
88            .read(path, args)
89            .instrument_await(format!("opendal::{}", Operation::Read))
90            .await
91            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
92    }
93
94    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
95        self.inner
96            .write(path, args)
97            .instrument_await(format!("opendal::{}", Operation::Write))
98            .await
99            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
100    }
101
102    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
103        self.inner()
104            .copy(from, to, args)
105            .instrument_await(format!("opendal::{}", Operation::Copy))
106            .await
107    }
108
109    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
110        self.inner()
111            .rename(from, to, args)
112            .instrument_await(format!("opendal::{}", Operation::Rename))
113            .await
114    }
115
116    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
117        self.inner
118            .stat(path, args)
119            .instrument_await(format!("opendal::{}", Operation::Stat))
120            .await
121    }
122
123    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
124        self.inner
125            .delete()
126            .instrument_await(format!("opendal::{}", Operation::Delete))
127            .await
128            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
129    }
130
131    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
132        self.inner
133            .list(path, args)
134            .instrument_await(format!("opendal::{}", Operation::List))
135            .await
136            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
137    }
138
139    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
140        self.inner
141            .presign(path, args)
142            .instrument_await(format!("opendal::{}", Operation::Presign))
143            .await
144    }
145
146    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
147        self.inner
148            .blocking_read(path, args)
149            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
150    }
151
152    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
153        self.inner
154            .blocking_write(path, args)
155            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
156    }
157
158    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
159        self.inner
160            .blocking_list(path, args)
161            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
162    }
163
164    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
165        self.inner
166            .blocking_delete()
167            .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
168    }
169}
170
171pub struct AwaitTreeWrapper<R> {
172    inner: R,
173}
174
175impl<R> AwaitTreeWrapper<R> {
176    fn new(inner: R) -> Self {
177        Self { inner }
178    }
179}
180
181impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
182    async fn read(&mut self) -> Result<Buffer> {
183        self.inner
184            .read()
185            .instrument_await(format!("opendal::{}", Operation::Read))
186            .await
187    }
188}
189
190impl<R: oio::BlockingRead> oio::BlockingRead for AwaitTreeWrapper<R> {
191    fn read(&mut self) -> Result<Buffer> {
192        self.inner.read()
193    }
194}
195
196impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
197    fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
198        self.inner
199            .write(bs)
200            .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
201    }
202
203    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
204        self.inner
205            .abort()
206            .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
207    }
208
209    fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
210        self.inner
211            .close()
212            .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
213    }
214}
215
216impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
217    fn write(&mut self, bs: Buffer) -> Result<()> {
218        self.inner.write(bs)
219    }
220
221    fn close(&mut self) -> Result<Metadata> {
222        self.inner.close()
223    }
224}
225
226impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
227    async fn next(&mut self) -> Result<Option<oio::Entry>> {
228        self.inner
229            .next()
230            .instrument_await(format!("opendal::{}", Operation::List))
231            .await
232    }
233}
234
235impl<R: oio::BlockingList> oio::BlockingList for AwaitTreeWrapper<R> {
236    fn next(&mut self) -> Result<Option<oio::Entry>> {
237        self.inner.next()
238    }
239}
240
241impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
242    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
243        self.inner.delete(path, args)
244    }
245
246    async fn flush(&mut self) -> Result<usize> {
247        self.inner
248            .flush()
249            .instrument_await(format!("opendal::{}", Operation::Delete))
250            .await
251    }
252}
253
254impl<R: oio::BlockingDelete> oio::BlockingDelete for AwaitTreeWrapper<R> {
255    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
256        self.inner.delete(path, args)
257    }
258
259    fn flush(&mut self) -> Result<usize> {
260        self.inner.flush()
261    }
262}