opendal/layers/
await_tree.rs1use await_tree::InstrumentAwait;
19use futures::Future;
20
21use crate::raw::*;
22use crate::*;
23
24#[derive(Clone, Default)]
49pub struct AwaitTreeLayer {}
50
51impl AwaitTreeLayer {
52 pub fn new() -> Self {
54 Self {}
55 }
56}
57
58impl<A: Access> Layer<A> for AwaitTreeLayer {
59 type LayeredAccess = AwaitTreeAccessor<A>;
60
61 fn layer(&self, accessor: A) -> Self::LayeredAccess {
62 AwaitTreeAccessor { inner: accessor }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct AwaitTreeAccessor<A: Access> {
68 inner: A,
69}
70
71impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
72 type Inner = A;
73 type Reader = AwaitTreeWrapper<A::Reader>;
74 type Writer = AwaitTreeWrapper<A::Writer>;
75 type Lister = AwaitTreeWrapper<A::Lister>;
76 type Deleter = AwaitTreeWrapper<A::Deleter>;
77
78 fn inner(&self) -> &Self::Inner {
79 &self.inner
80 }
81
82 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
83 self.inner
84 .read(path, args)
85 .instrument_await(format!("opendal::{}", Operation::Read))
86 .await
87 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
88 }
89
90 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
91 self.inner
92 .write(path, args)
93 .instrument_await(format!("opendal::{}", Operation::Write))
94 .await
95 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
96 }
97
98 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
99 self.inner()
100 .copy(from, to, args)
101 .instrument_await(format!("opendal::{}", Operation::Copy))
102 .await
103 }
104
105 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
106 self.inner()
107 .rename(from, to, args)
108 .instrument_await(format!("opendal::{}", Operation::Rename))
109 .await
110 }
111
112 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
113 self.inner
114 .stat(path, args)
115 .instrument_await(format!("opendal::{}", Operation::Stat))
116 .await
117 }
118
119 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
120 self.inner
121 .delete()
122 .instrument_await(format!("opendal::{}", Operation::Delete))
123 .await
124 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
125 }
126
127 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
128 self.inner
129 .list(path, args)
130 .instrument_await(format!("opendal::{}", Operation::List))
131 .await
132 .map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
133 }
134
135 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
136 self.inner
137 .presign(path, args)
138 .instrument_await(format!("opendal::{}", Operation::Presign))
139 .await
140 }
141}
142
143pub struct AwaitTreeWrapper<R> {
144 inner: R,
145}
146
147impl<R> AwaitTreeWrapper<R> {
148 fn new(inner: R) -> Self {
149 Self { inner }
150 }
151}
152
153impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
154 async fn read(&mut self) -> Result<Buffer> {
155 self.inner
156 .read()
157 .instrument_await(format!("opendal::{}", Operation::Read))
158 .await
159 }
160}
161
162impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
163 fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
164 self.inner
165 .write(bs)
166 .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
167 }
168
169 fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
170 self.inner
171 .abort()
172 .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
173 }
174
175 fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
176 self.inner
177 .close()
178 .instrument_await(format!("opendal::{}", Operation::Write.into_static()))
179 }
180}
181
182impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
183 async fn next(&mut self) -> Result<Option<oio::Entry>> {
184 self.inner
185 .next()
186 .instrument_await(format!("opendal::{}", Operation::List))
187 .await
188 }
189}
190
191impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
192 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
193 self.inner.delete(path, args)
194 }
195
196 async fn flush(&mut self) -> Result<usize> {
197 self.inner
198 .flush()
199 .instrument_await(format!("opendal::{}", Operation::Delete))
200 .await
201 }
202}