opendal/layers/
await_tree.rs1use await_tree::InstrumentAwait;
19use futures::Future;
20
21use crate::raw::*;
22use crate::*;
23
24#[derive(Clone, Default)]
49pub struct AwaitTreeLayer {}
50
51impl AwaitTreeLayer {
52 pub fn new() -> Self {
54 Self {}
55 }
56}
57
58impl<A: Access> Layer<A> for AwaitTreeLayer {
59 type LayeredAccess = AwaitTreeAccessor<A>;
60
61 fn layer(&self, accessor: A) -> Self::LayeredAccess {
62 AwaitTreeAccessor { inner: accessor }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct AwaitTreeAccessor<A: Access> {
68 inner: A,
69}
70
71impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
72 type Inner = A;
73 type Reader = AwaitTreeWrapper<A::Reader>;
74 type BlockingReader = AwaitTreeWrapper<A::BlockingReader>;
75 type Writer = AwaitTreeWrapper<A::Writer>;
76 type BlockingWriter = AwaitTreeWrapper<A::BlockingWriter>;
77 type Lister = AwaitTreeWrapper<A::Lister>;
78 type BlockingLister = AwaitTreeWrapper<A::BlockingLister>;
79 type Deleter = AwaitTreeWrapper<A::Deleter>;
80 type BlockingDeleter = AwaitTreeWrapper<A::BlockingDeleter>;
81
82 fn inner(&self) -> &Self::Inner {
83 &self.inner
84 }
85
86 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
87 self.inner
88 .read(path, args)
89 .instrument_await(format!("opendal::{}", Operation::Read))
90 .await
91 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
92 }
93
94 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
95 self.inner
96 .write(path, args)
97 .instrument_await(format!("opendal::{}", Operation::Write))
98 .await
99 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
100 }
101
102 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
103 self.inner()
104 .copy(from, to, args)
105 .instrument_await(format!("opendal::{}", Operation::Copy))
106 .await
107 }
108
109 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
110 self.inner()
111 .rename(from, to, args)
112 .instrument_await(format!("opendal::{}", Operation::Rename))
113 .await
114 }
115
116 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
117 self.inner
118 .stat(path, args)
119 .instrument_await(format!("opendal::{}", Operation::Stat))
120 .await
121 }
122
123 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
124 self.inner
125 .delete()
126 .instrument_await(format!("opendal::{}", Operation::Delete))
127 .await
128 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
129 }
130
131 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
132 self.inner
133 .list(path, args)
134 .instrument_await(format!("opendal::{}", Operation::List))
135 .await
136 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
137 }
138
139 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
140 self.inner
141 .presign(path, args)
142 .instrument_await(format!("opendal::{}", Operation::Presign))
143 .await
144 }
145
146 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
147 self.inner
148 .blocking_read(path, args)
149 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
150 }
151
152 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
153 self.inner
154 .blocking_write(path, args)
155 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
156 }
157
158 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
159 self.inner
160 .blocking_list(path, args)
161 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
162 }
163
164 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
165 self.inner
166 .blocking_delete()
167 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
168 }
169}
170
171pub struct AwaitTreeWrapper<R> {
172 inner: R,
173}
174
175impl<R> AwaitTreeWrapper<R> {
176 fn new(inner: R) -> Self {
177 Self { inner }
178 }
179}
180
181impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
182 async fn read(&mut self) -> Result<Buffer> {
183 self.inner
184 .read()
185 .instrument_await(format!("opendal::{}", Operation::Read))
186 .await
187 }
188}
189
190impl<R: oio::BlockingRead> oio::BlockingRead for AwaitTreeWrapper<R> {
191 fn read(&mut self) -> Result<Buffer> {
192 self.inner.read()
193 }
194}
195
196impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
197 fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
198 self.inner
199 .write(bs)
200 .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
201 }
202
203 fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
204 self.inner
205 .abort()
206 .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
207 }
208
209 fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
210 self.inner
211 .close()
212 .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
213 }
214}
215
216impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
217 fn write(&mut self, bs: Buffer) -> Result<()> {
218 self.inner.write(bs)
219 }
220
221 fn close(&mut self) -> Result<Metadata> {
222 self.inner.close()
223 }
224}
225
226impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
227 async fn next(&mut self) -> Result<Option<oio::Entry>> {
228 self.inner
229 .next()
230 .instrument_await(format!("opendal::{}", Operation::List))
231 .await
232 }
233}
234
235impl<R: oio::BlockingList> oio::BlockingList for AwaitTreeWrapper<R> {
236 fn next(&mut self) -> Result<Option<oio::Entry>> {
237 self.inner.next()
238 }
239}
240
241impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
242 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
243 self.inner.delete(path, args)
244 }
245
246 async fn flush(&mut self) -> Result<usize> {
247 self.inner
248 .flush()
249 .instrument_await(format!("opendal::{}", Operation::Delete))
250 .await
251 }
252}
253
254impl<R: oio::BlockingDelete> oio::BlockingDelete for AwaitTreeWrapper<R> {
255 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
256 self.inner.delete(path, args)
257 }
258
259 fn flush(&mut self) -> Result<usize> {
260 self.inner.flush()
261 }
262}