1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use crate::raw::*;
23use crate::*;
24
25pub struct ErrorContextLayer;
44
45impl<A: Access> Layer<A> for ErrorContextLayer {
46 type LayeredAccess = ErrorContextAccessor<A>;
47
48 fn layer(&self, inner: A) -> Self::LayeredAccess {
49 let info = inner.info();
50 ErrorContextAccessor { info, inner }
51 }
52}
53
54pub struct ErrorContextAccessor<A: Access> {
56 info: Arc<AccessorInfo>,
57 inner: A,
58}
59
60impl<A: Access> Debug for ErrorContextAccessor<A> {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 self.inner.fmt(f)
63 }
64}
65
66impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
67 type Inner = A;
68 type Reader = ErrorContextWrapper<A::Reader>;
69 type Writer = ErrorContextWrapper<A::Writer>;
70 type Lister = ErrorContextWrapper<A::Lister>;
71 type Deleter = ErrorContextWrapper<A::Deleter>;
72
73 fn inner(&self) -> &Self::Inner {
74 &self.inner
75 }
76
77 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
78 self.inner.create_dir(path, args).await.map_err(|err| {
79 err.with_operation(Operation::CreateDir)
80 .with_context("service", self.info.scheme())
81 .with_context("path", path)
82 })
83 }
84
85 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
86 let range = args.range();
87 self.inner
88 .read(path, args)
89 .await
90 .map(|(rp, r)| {
91 (
92 rp,
93 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), r)
94 .with_range(range),
95 )
96 })
97 .map_err(|err| {
98 err.with_operation(Operation::Read)
99 .with_context("service", self.info.scheme())
100 .with_context("path", path)
101 .with_context("range", range.to_string())
102 })
103 }
104
105 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
106 self.inner
107 .write(path, args)
108 .await
109 .map(|(rp, w)| {
110 (
111 rp,
112 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), w),
113 )
114 })
115 .map_err(|err| {
116 err.with_operation(Operation::Write)
117 .with_context("service", self.info.scheme())
118 .with_context("path", path)
119 })
120 }
121
122 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
123 self.inner.copy(from, to, args).await.map_err(|err| {
124 err.with_operation(Operation::Copy)
125 .with_context("service", self.info.scheme())
126 .with_context("from", from)
127 .with_context("to", to)
128 })
129 }
130
131 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
132 self.inner.rename(from, to, args).await.map_err(|err| {
133 err.with_operation(Operation::Rename)
134 .with_context("service", self.info.scheme())
135 .with_context("from", from)
136 .with_context("to", to)
137 })
138 }
139
140 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
141 self.inner.stat(path, args).await.map_err(|err| {
142 err.with_operation(Operation::Stat)
143 .with_context("service", self.info.scheme())
144 .with_context("path", path)
145 })
146 }
147
148 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
149 self.inner
150 .delete()
151 .await
152 .map(|(rp, w)| {
153 (
154 rp,
155 ErrorContextWrapper::new(self.info.scheme(), "".to_string(), w),
156 )
157 })
158 .map_err(|err| {
159 err.with_operation(Operation::Delete)
160 .with_context("service", self.info.scheme())
161 })
162 }
163
164 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
165 self.inner
166 .list(path, args)
167 .await
168 .map(|(rp, p)| {
169 (
170 rp,
171 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), p),
172 )
173 })
174 .map_err(|err| {
175 err.with_operation(Operation::List)
176 .with_context("service", self.info.scheme())
177 .with_context("path", path)
178 })
179 }
180
181 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
182 self.inner.presign(path, args).await.map_err(|err| {
183 err.with_operation(Operation::Presign)
184 .with_context("service", self.info.scheme())
185 .with_context("path", path)
186 })
187 }
188}
189
190pub struct ErrorContextWrapper<T> {
191 scheme: Scheme,
192 path: String,
193 inner: T,
194 range: BytesRange,
195 processed: u64,
196}
197
198impl<T> ErrorContextWrapper<T> {
199 fn new(scheme: Scheme, path: String, inner: T) -> Self {
200 Self {
201 scheme,
202 path,
203 inner,
204 range: BytesRange::default(),
205 processed: 0,
206 }
207 }
208
209 fn with_range(mut self, range: BytesRange) -> Self {
210 self.range = range;
211 self
212 }
213}
214
215impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
216 async fn read(&mut self) -> Result<Buffer> {
217 self.inner
218 .read()
219 .await
220 .inspect(|bs| {
221 self.processed += bs.len() as u64;
222 })
223 .map_err(|err| {
224 err.with_operation(Operation::Read)
225 .with_context("service", self.scheme)
226 .with_context("path", &self.path)
227 .with_context("range", self.range.to_string())
228 .with_context("read", self.processed.to_string())
229 })
230 }
231}
232
233impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
234 async fn write(&mut self, bs: Buffer) -> Result<()> {
235 let size = bs.len();
236 self.inner
237 .write(bs)
238 .await
239 .map(|_| {
240 self.processed += size as u64;
241 })
242 .map_err(|err| {
243 err.with_operation(Operation::Write)
244 .with_context("service", self.scheme)
245 .with_context("path", &self.path)
246 .with_context("size", size.to_string())
247 .with_context("written", self.processed.to_string())
248 })
249 }
250
251 async fn close(&mut self) -> Result<Metadata> {
252 self.inner.close().await.map_err(|err| {
253 err.with_operation(Operation::Write)
254 .with_context("service", self.scheme)
255 .with_context("path", &self.path)
256 .with_context("written", self.processed.to_string())
257 })
258 }
259
260 async fn abort(&mut self) -> Result<()> {
261 self.inner.abort().await.map_err(|err| {
262 err.with_operation(Operation::Write)
263 .with_context("service", self.scheme)
264 .with_context("path", &self.path)
265 .with_context("processed", self.processed.to_string())
266 })
267 }
268}
269
270impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
271 async fn next(&mut self) -> Result<Option<oio::Entry>> {
272 self.inner
273 .next()
274 .await
275 .inspect(|bs| {
276 self.processed += bs.is_some() as u64;
277 })
278 .map_err(|err| {
279 err.with_operation(Operation::List)
280 .with_context("service", self.scheme)
281 .with_context("path", &self.path)
282 .with_context("listed", self.processed.to_string())
283 })
284 }
285}
286
287impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> {
288 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
289 self.inner.delete(path, args).map_err(|err| {
290 err.with_operation(Operation::Delete)
291 .with_context("service", self.scheme)
292 .with_context("path", path)
293 .with_context("deleted", self.processed.to_string())
294 })
295 }
296
297 async fn flush(&mut self) -> Result<usize> {
298 self.inner
299 .flush()
300 .await
301 .inspect(|&n| {
302 self.processed += n as u64;
303 })
304 .map_err(|err| {
305 err.with_operation(Operation::Delete)
306 .with_context("service", self.scheme)
307 .with_context("deleted", self.processed.to_string())
308 })
309 }
310}