1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use crate::raw::*;
23use crate::*;
24
25pub struct ErrorContextLayer;
44
45impl<A: Access> Layer<A> for ErrorContextLayer {
46 type LayeredAccess = ErrorContextAccessor<A>;
47
48 fn layer(&self, inner: A) -> Self::LayeredAccess {
49 let info = inner.info();
50 ErrorContextAccessor { info, inner }
51 }
52}
53
54pub struct ErrorContextAccessor<A: Access> {
56 info: Arc<AccessorInfo>,
57 inner: A,
58}
59
60impl<A: Access> Debug for ErrorContextAccessor<A> {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 self.inner.fmt(f)
63 }
64}
65
66impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
67 type Inner = A;
68 type Reader = ErrorContextWrapper<A::Reader>;
69 type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
70 type Writer = ErrorContextWrapper<A::Writer>;
71 type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
72 type Lister = ErrorContextWrapper<A::Lister>;
73 type BlockingLister = ErrorContextWrapper<A::BlockingLister>;
74 type Deleter = ErrorContextWrapper<A::Deleter>;
75 type BlockingDeleter = ErrorContextWrapper<A::BlockingDeleter>;
76
77 fn inner(&self) -> &Self::Inner {
78 &self.inner
79 }
80
81 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
82 self.inner.create_dir(path, args).await.map_err(|err| {
83 err.with_operation(Operation::CreateDir)
84 .with_context("service", self.info.scheme())
85 .with_context("path", path)
86 })
87 }
88
89 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
90 let range = args.range();
91 self.inner
92 .read(path, args)
93 .await
94 .map(|(rp, r)| {
95 (
96 rp,
97 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), r)
98 .with_range(range),
99 )
100 })
101 .map_err(|err| {
102 err.with_operation(Operation::Read)
103 .with_context("service", self.info.scheme())
104 .with_context("path", path)
105 .with_context("range", range.to_string())
106 })
107 }
108
109 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
110 self.inner
111 .write(path, args)
112 .await
113 .map(|(rp, w)| {
114 (
115 rp,
116 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), w),
117 )
118 })
119 .map_err(|err| {
120 err.with_operation(Operation::Write)
121 .with_context("service", self.info.scheme())
122 .with_context("path", path)
123 })
124 }
125
126 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
127 self.inner.copy(from, to, args).await.map_err(|err| {
128 err.with_operation(Operation::Copy)
129 .with_context("service", self.info.scheme())
130 .with_context("from", from)
131 .with_context("to", to)
132 })
133 }
134
135 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
136 self.inner.rename(from, to, args).await.map_err(|err| {
137 err.with_operation(Operation::Rename)
138 .with_context("service", self.info.scheme())
139 .with_context("from", from)
140 .with_context("to", to)
141 })
142 }
143
144 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
145 self.inner.stat(path, args).await.map_err(|err| {
146 err.with_operation(Operation::Stat)
147 .with_context("service", self.info.scheme())
148 .with_context("path", path)
149 })
150 }
151
152 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
153 self.inner
154 .delete()
155 .await
156 .map(|(rp, w)| {
157 (
158 rp,
159 ErrorContextWrapper::new(self.info.scheme(), "".to_string(), w),
160 )
161 })
162 .map_err(|err| {
163 err.with_operation(Operation::Delete)
164 .with_context("service", self.info.scheme())
165 })
166 }
167
168 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
169 self.inner
170 .list(path, args)
171 .await
172 .map(|(rp, p)| {
173 (
174 rp,
175 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), p),
176 )
177 })
178 .map_err(|err| {
179 err.with_operation(Operation::List)
180 .with_context("service", self.info.scheme())
181 .with_context("path", path)
182 })
183 }
184
185 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
186 self.inner.presign(path, args).await.map_err(|err| {
187 err.with_operation(Operation::Presign)
188 .with_context("service", self.info.scheme())
189 .with_context("path", path)
190 })
191 }
192
193 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
194 self.inner.blocking_create_dir(path, args).map_err(|err| {
195 err.with_operation(Operation::CreateDir)
196 .with_context("service", self.info.scheme())
197 .with_context("path", path)
198 })
199 }
200
201 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
202 let range = args.range();
203 self.inner
204 .blocking_read(path, args)
205 .map(|(rp, os)| {
206 (
207 rp,
208 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), os)
209 .with_range(range),
210 )
211 })
212 .map_err(|err| {
213 err.with_operation(Operation::Read)
214 .with_context("service", self.info.scheme())
215 .with_context("path", path)
216 .with_context("range", range.to_string())
217 })
218 }
219
220 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
221 self.inner
222 .blocking_write(path, args)
223 .map(|(rp, os)| {
224 (
225 rp,
226 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), os),
227 )
228 })
229 .map_err(|err| {
230 err.with_operation(Operation::Write)
231 .with_context("service", self.info.scheme())
232 .with_context("path", path)
233 })
234 }
235
236 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
237 self.inner.blocking_copy(from, to, args).map_err(|err| {
238 err.with_operation(Operation::Copy)
239 .with_context("service", self.info.scheme())
240 .with_context("from", from)
241 .with_context("to", to)
242 })
243 }
244
245 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
246 self.inner.blocking_rename(from, to, args).map_err(|err| {
247 err.with_operation(Operation::Rename)
248 .with_context("service", self.info.scheme())
249 .with_context("from", from)
250 .with_context("to", to)
251 })
252 }
253
254 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
255 self.inner.blocking_stat(path, args).map_err(|err| {
256 err.with_operation(Operation::Stat)
257 .with_context("service", self.info.scheme())
258 .with_context("path", path)
259 })
260 }
261
262 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
263 self.inner
264 .blocking_delete()
265 .map(|(rp, w)| {
266 (
267 rp,
268 ErrorContextWrapper::new(self.info.scheme(), "".to_string(), w),
269 )
270 })
271 .map_err(|err| {
272 err.with_operation(Operation::Delete)
273 .with_context("service", self.info.scheme())
274 })
275 }
276
277 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
278 self.inner
279 .blocking_list(path, args)
280 .map(|(rp, os)| {
281 (
282 rp,
283 ErrorContextWrapper::new(self.info.scheme(), path.to_string(), os),
284 )
285 })
286 .map_err(|err| {
287 err.with_operation(Operation::List)
288 .with_context("service", self.info.scheme())
289 .with_context("path", path)
290 })
291 }
292}
293
294pub struct ErrorContextWrapper<T> {
295 scheme: Scheme,
296 path: String,
297 inner: T,
298 range: BytesRange,
299 processed: u64,
300}
301
302impl<T> ErrorContextWrapper<T> {
303 fn new(scheme: Scheme, path: String, inner: T) -> Self {
304 Self {
305 scheme,
306 path,
307 inner,
308 range: BytesRange::default(),
309 processed: 0,
310 }
311 }
312
313 fn with_range(mut self, range: BytesRange) -> Self {
314 self.range = range;
315 self
316 }
317}
318
319impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
320 async fn read(&mut self) -> Result<Buffer> {
321 self.inner
322 .read()
323 .await
324 .inspect(|bs| {
325 self.processed += bs.len() as u64;
326 })
327 .map_err(|err| {
328 err.with_operation(Operation::Read)
329 .with_context("service", self.scheme)
330 .with_context("path", &self.path)
331 .with_context("range", self.range.to_string())
332 .with_context("read", self.processed.to_string())
333 })
334 }
335}
336
337impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
338 fn read(&mut self) -> Result<Buffer> {
339 self.inner
340 .read()
341 .inspect(|bs| {
342 self.processed += bs.len() as u64;
343 })
344 .map_err(|err| {
345 err.with_operation(Operation::Read)
346 .with_context("service", self.scheme)
347 .with_context("path", &self.path)
348 .with_context("range", self.range.to_string())
349 .with_context("read", self.processed.to_string())
350 })
351 }
352}
353
354impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
355 async fn write(&mut self, bs: Buffer) -> Result<()> {
356 let size = bs.len();
357 self.inner
358 .write(bs)
359 .await
360 .map(|_| {
361 self.processed += size as u64;
362 })
363 .map_err(|err| {
364 err.with_operation(Operation::Write)
365 .with_context("service", self.scheme)
366 .with_context("path", &self.path)
367 .with_context("size", size.to_string())
368 .with_context("written", self.processed.to_string())
369 })
370 }
371
372 async fn close(&mut self) -> Result<Metadata> {
373 self.inner.close().await.map_err(|err| {
374 err.with_operation(Operation::Write)
375 .with_context("service", self.scheme)
376 .with_context("path", &self.path)
377 .with_context("written", self.processed.to_string())
378 })
379 }
380
381 async fn abort(&mut self) -> Result<()> {
382 self.inner.abort().await.map_err(|err| {
383 err.with_operation(Operation::Write)
384 .with_context("service", self.scheme)
385 .with_context("path", &self.path)
386 .with_context("processed", self.processed.to_string())
387 })
388 }
389}
390
391impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
392 fn write(&mut self, bs: Buffer) -> Result<()> {
393 let size = bs.len();
394 self.inner
395 .write(bs)
396 .map(|_| {
397 self.processed += size as u64;
398 })
399 .map_err(|err| {
400 err.with_operation(Operation::Write)
401 .with_context("service", self.scheme)
402 .with_context("path", &self.path)
403 .with_context("size", size.to_string())
404 .with_context("written", self.processed.to_string())
405 })
406 }
407
408 fn close(&mut self) -> Result<Metadata> {
409 self.inner.close().map_err(|err| {
410 err.with_operation(Operation::Write)
411 .with_context("service", self.scheme)
412 .with_context("path", &self.path)
413 .with_context("written", self.processed.to_string())
414 })
415 }
416}
417
418impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
419 async fn next(&mut self) -> Result<Option<oio::Entry>> {
420 self.inner
421 .next()
422 .await
423 .inspect(|bs| {
424 self.processed += bs.is_some() as u64;
425 })
426 .map_err(|err| {
427 err.with_operation(Operation::List)
428 .with_context("service", self.scheme)
429 .with_context("path", &self.path)
430 .with_context("listed", self.processed.to_string())
431 })
432 }
433}
434
435impl<T: oio::BlockingList> oio::BlockingList for ErrorContextWrapper<T> {
436 fn next(&mut self) -> Result<Option<oio::Entry>> {
437 self.inner
438 .next()
439 .inspect(|bs| {
440 self.processed += bs.is_some() as u64;
441 })
442 .map_err(|err| {
443 err.with_operation(Operation::List)
444 .with_context("service", self.scheme)
445 .with_context("path", &self.path)
446 .with_context("listed", self.processed.to_string())
447 })
448 }
449}
450
451impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> {
452 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
453 self.inner.delete(path, args).map_err(|err| {
454 err.with_operation(Operation::Delete)
455 .with_context("service", self.scheme)
456 .with_context("path", path)
457 .with_context("deleted", self.processed.to_string())
458 })
459 }
460
461 async fn flush(&mut self) -> Result<usize> {
462 self.inner
463 .flush()
464 .await
465 .inspect(|&n| {
466 self.processed += n as u64;
467 })
468 .map_err(|err| {
469 err.with_operation(Operation::Delete)
470 .with_context("service", self.scheme)
471 .with_context("deleted", self.processed.to_string())
472 })
473 }
474}
475
476impl<T: oio::BlockingDelete> oio::BlockingDelete for ErrorContextWrapper<T> {
477 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
478 self.inner.delete(path, args).map_err(|err| {
479 err.with_operation(Operation::Delete)
480 .with_context("service", self.scheme)
481 .with_context("path", path)
482 .with_context("deleted", self.processed.to_string())
483 })
484 }
485
486 fn flush(&mut self) -> Result<usize> {
487 self.inner
488 .flush()
489 .inspect(|&n| {
490 self.processed += n as u64;
491 })
492 .map_err(|err| {
493 err.with_operation(Operation::Delete)
494 .with_context("service", self.scheme)
495 .with_context("deleted", self.processed.to_string())
496 })
497 }
498}