opendal/layers/
error_context.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use crate::raw::*;
23use crate::*;
24
25/// ErrorContextLayer will add error context into all layers.
26///
27/// # Notes
28///
29/// This layer will add the following error context into all errors:
30///
31/// - `service`: The [`Scheme`] of underlying service.
32/// - `operation`: The [`Operation`] of this operation
33/// - `path`: The path of this operation
34///
35/// Some operations may have additional context:
36///
37/// - `range`: The range the read operation is trying to read.
38/// - `read`: The already read size in given reader.
39/// - `size`: The size of the current write operation.
40/// - `written`: The already written size in given writer.
41/// - `listed`: The already listed size in given lister.
42/// - `deleted`: The already deleted size in given deleter.
43pub struct ErrorContextLayer;
44
45impl<A: Access> Layer<A> for ErrorContextLayer {
46    type LayeredAccess = ErrorContextAccessor<A>;
47
48    fn layer(&self, inner: A) -> Self::LayeredAccess {
49        let info = inner.info();
50        ErrorContextAccessor { info, inner }
51    }
52}
53
54/// Provide error context wrapper for backend.
55pub struct ErrorContextAccessor<A: Access> {
56    info: Arc<AccessorInfo>,
57    inner: A,
58}
59
60impl<A: Access> Debug for ErrorContextAccessor<A> {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        self.inner.fmt(f)
63    }
64}
65
66impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
67    type Inner = A;
68    type Reader = ErrorContextWrapper<A::Reader>;
69    type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
70    type Writer = ErrorContextWrapper<A::Writer>;
71    type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
72    type Lister = ErrorContextWrapper<A::Lister>;
73    type BlockingLister = ErrorContextWrapper<A::BlockingLister>;
74    type Deleter = ErrorContextWrapper<A::Deleter>;
75    type BlockingDeleter = ErrorContextWrapper<A::BlockingDeleter>;
76
77    fn inner(&self) -> &Self::Inner {
78        &self.inner
79    }
80
81    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
82        self.inner.create_dir(path, args).await.map_err(|err| {
83            err.with_operation(Operation::CreateDir)
84                .with_context("service", self.info.scheme())
85                .with_context("path", path)
86        })
87    }
88
89    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
90        let range = args.range();
91        self.inner
92            .read(path, args)
93            .await
94            .map(|(rp, r)| {
95                (
96                    rp,
97                    ErrorContextWrapper::new(self.info.scheme(), path.to_string(), r)
98                        .with_range(range),
99                )
100            })
101            .map_err(|err| {
102                err.with_operation(Operation::Read)
103                    .with_context("service", self.info.scheme())
104                    .with_context("path", path)
105                    .with_context("range", range.to_string())
106            })
107    }
108
109    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
110        self.inner
111            .write(path, args)
112            .await
113            .map(|(rp, w)| {
114                (
115                    rp,
116                    ErrorContextWrapper::new(self.info.scheme(), path.to_string(), w),
117                )
118            })
119            .map_err(|err| {
120                err.with_operation(Operation::Write)
121                    .with_context("service", self.info.scheme())
122                    .with_context("path", path)
123            })
124    }
125
126    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
127        self.inner.copy(from, to, args).await.map_err(|err| {
128            err.with_operation(Operation::Copy)
129                .with_context("service", self.info.scheme())
130                .with_context("from", from)
131                .with_context("to", to)
132        })
133    }
134
135    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
136        self.inner.rename(from, to, args).await.map_err(|err| {
137            err.with_operation(Operation::Rename)
138                .with_context("service", self.info.scheme())
139                .with_context("from", from)
140                .with_context("to", to)
141        })
142    }
143
144    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
145        self.inner.stat(path, args).await.map_err(|err| {
146            err.with_operation(Operation::Stat)
147                .with_context("service", self.info.scheme())
148                .with_context("path", path)
149        })
150    }
151
152    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
153        self.inner
154            .delete()
155            .await
156            .map(|(rp, w)| {
157                (
158                    rp,
159                    ErrorContextWrapper::new(self.info.scheme(), "".to_string(), w),
160                )
161            })
162            .map_err(|err| {
163                err.with_operation(Operation::Delete)
164                    .with_context("service", self.info.scheme())
165            })
166    }
167
168    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
169        self.inner
170            .list(path, args)
171            .await
172            .map(|(rp, p)| {
173                (
174                    rp,
175                    ErrorContextWrapper::new(self.info.scheme(), path.to_string(), p),
176                )
177            })
178            .map_err(|err| {
179                err.with_operation(Operation::List)
180                    .with_context("service", self.info.scheme())
181                    .with_context("path", path)
182            })
183    }
184
185    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
186        self.inner.presign(path, args).await.map_err(|err| {
187            err.with_operation(Operation::Presign)
188                .with_context("service", self.info.scheme())
189                .with_context("path", path)
190        })
191    }
192
193    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
194        self.inner.blocking_create_dir(path, args).map_err(|err| {
195            err.with_operation(Operation::CreateDir)
196                .with_context("service", self.info.scheme())
197                .with_context("path", path)
198        })
199    }
200
201    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
202        let range = args.range();
203        self.inner
204            .blocking_read(path, args)
205            .map(|(rp, os)| {
206                (
207                    rp,
208                    ErrorContextWrapper::new(self.info.scheme(), path.to_string(), os)
209                        .with_range(range),
210                )
211            })
212            .map_err(|err| {
213                err.with_operation(Operation::Read)
214                    .with_context("service", self.info.scheme())
215                    .with_context("path", path)
216                    .with_context("range", range.to_string())
217            })
218    }
219
220    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
221        self.inner
222            .blocking_write(path, args)
223            .map(|(rp, os)| {
224                (
225                    rp,
226                    ErrorContextWrapper::new(self.info.scheme(), path.to_string(), os),
227                )
228            })
229            .map_err(|err| {
230                err.with_operation(Operation::Write)
231                    .with_context("service", self.info.scheme())
232                    .with_context("path", path)
233            })
234    }
235
236    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
237        self.inner.blocking_copy(from, to, args).map_err(|err| {
238            err.with_operation(Operation::Copy)
239                .with_context("service", self.info.scheme())
240                .with_context("from", from)
241                .with_context("to", to)
242        })
243    }
244
245    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
246        self.inner.blocking_rename(from, to, args).map_err(|err| {
247            err.with_operation(Operation::Rename)
248                .with_context("service", self.info.scheme())
249                .with_context("from", from)
250                .with_context("to", to)
251        })
252    }
253
254    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
255        self.inner.blocking_stat(path, args).map_err(|err| {
256            err.with_operation(Operation::Stat)
257                .with_context("service", self.info.scheme())
258                .with_context("path", path)
259        })
260    }
261
262    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
263        self.inner
264            .blocking_delete()
265            .map(|(rp, w)| {
266                (
267                    rp,
268                    ErrorContextWrapper::new(self.info.scheme(), "".to_string(), w),
269                )
270            })
271            .map_err(|err| {
272                err.with_operation(Operation::Delete)
273                    .with_context("service", self.info.scheme())
274            })
275    }
276
277    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
278        self.inner
279            .blocking_list(path, args)
280            .map(|(rp, os)| {
281                (
282                    rp,
283                    ErrorContextWrapper::new(self.info.scheme(), path.to_string(), os),
284                )
285            })
286            .map_err(|err| {
287                err.with_operation(Operation::List)
288                    .with_context("service", self.info.scheme())
289                    .with_context("path", path)
290            })
291    }
292}
293
294pub struct ErrorContextWrapper<T> {
295    scheme: Scheme,
296    path: String,
297    inner: T,
298    range: BytesRange,
299    processed: u64,
300}
301
302impl<T> ErrorContextWrapper<T> {
303    fn new(scheme: Scheme, path: String, inner: T) -> Self {
304        Self {
305            scheme,
306            path,
307            inner,
308            range: BytesRange::default(),
309            processed: 0,
310        }
311    }
312
313    fn with_range(mut self, range: BytesRange) -> Self {
314        self.range = range;
315        self
316    }
317}
318
319impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
320    async fn read(&mut self) -> Result<Buffer> {
321        self.inner
322            .read()
323            .await
324            .inspect(|bs| {
325                self.processed += bs.len() as u64;
326            })
327            .map_err(|err| {
328                err.with_operation(Operation::Read)
329                    .with_context("service", self.scheme)
330                    .with_context("path", &self.path)
331                    .with_context("range", self.range.to_string())
332                    .with_context("read", self.processed.to_string())
333            })
334    }
335}
336
337impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
338    fn read(&mut self) -> Result<Buffer> {
339        self.inner
340            .read()
341            .inspect(|bs| {
342                self.processed += bs.len() as u64;
343            })
344            .map_err(|err| {
345                err.with_operation(Operation::Read)
346                    .with_context("service", self.scheme)
347                    .with_context("path", &self.path)
348                    .with_context("range", self.range.to_string())
349                    .with_context("read", self.processed.to_string())
350            })
351    }
352}
353
354impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
355    async fn write(&mut self, bs: Buffer) -> Result<()> {
356        let size = bs.len();
357        self.inner
358            .write(bs)
359            .await
360            .map(|_| {
361                self.processed += size as u64;
362            })
363            .map_err(|err| {
364                err.with_operation(Operation::Write)
365                    .with_context("service", self.scheme)
366                    .with_context("path", &self.path)
367                    .with_context("size", size.to_string())
368                    .with_context("written", self.processed.to_string())
369            })
370    }
371
372    async fn close(&mut self) -> Result<Metadata> {
373        self.inner.close().await.map_err(|err| {
374            err.with_operation(Operation::Write)
375                .with_context("service", self.scheme)
376                .with_context("path", &self.path)
377                .with_context("written", self.processed.to_string())
378        })
379    }
380
381    async fn abort(&mut self) -> Result<()> {
382        self.inner.abort().await.map_err(|err| {
383            err.with_operation(Operation::Write)
384                .with_context("service", self.scheme)
385                .with_context("path", &self.path)
386                .with_context("processed", self.processed.to_string())
387        })
388    }
389}
390
391impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
392    fn write(&mut self, bs: Buffer) -> Result<()> {
393        let size = bs.len();
394        self.inner
395            .write(bs)
396            .map(|_| {
397                self.processed += size as u64;
398            })
399            .map_err(|err| {
400                err.with_operation(Operation::Write)
401                    .with_context("service", self.scheme)
402                    .with_context("path", &self.path)
403                    .with_context("size", size.to_string())
404                    .with_context("written", self.processed.to_string())
405            })
406    }
407
408    fn close(&mut self) -> Result<Metadata> {
409        self.inner.close().map_err(|err| {
410            err.with_operation(Operation::Write)
411                .with_context("service", self.scheme)
412                .with_context("path", &self.path)
413                .with_context("written", self.processed.to_string())
414        })
415    }
416}
417
418impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
419    async fn next(&mut self) -> Result<Option<oio::Entry>> {
420        self.inner
421            .next()
422            .await
423            .inspect(|bs| {
424                self.processed += bs.is_some() as u64;
425            })
426            .map_err(|err| {
427                err.with_operation(Operation::List)
428                    .with_context("service", self.scheme)
429                    .with_context("path", &self.path)
430                    .with_context("listed", self.processed.to_string())
431            })
432    }
433}
434
435impl<T: oio::BlockingList> oio::BlockingList for ErrorContextWrapper<T> {
436    fn next(&mut self) -> Result<Option<oio::Entry>> {
437        self.inner
438            .next()
439            .inspect(|bs| {
440                self.processed += bs.is_some() as u64;
441            })
442            .map_err(|err| {
443                err.with_operation(Operation::List)
444                    .with_context("service", self.scheme)
445                    .with_context("path", &self.path)
446                    .with_context("listed", self.processed.to_string())
447            })
448    }
449}
450
451impl<T: oio::Delete> oio::Delete for ErrorContextWrapper<T> {
452    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
453        self.inner.delete(path, args).map_err(|err| {
454            err.with_operation(Operation::Delete)
455                .with_context("service", self.scheme)
456                .with_context("path", path)
457                .with_context("deleted", self.processed.to_string())
458        })
459    }
460
461    async fn flush(&mut self) -> Result<usize> {
462        self.inner
463            .flush()
464            .await
465            .inspect(|&n| {
466                self.processed += n as u64;
467            })
468            .map_err(|err| {
469                err.with_operation(Operation::Delete)
470                    .with_context("service", self.scheme)
471                    .with_context("deleted", self.processed.to_string())
472            })
473    }
474}
475
476impl<T: oio::BlockingDelete> oio::BlockingDelete for ErrorContextWrapper<T> {
477    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
478        self.inner.delete(path, args).map_err(|err| {
479            err.with_operation(Operation::Delete)
480                .with_context("service", self.scheme)
481                .with_context("path", path)
482                .with_context("deleted", self.processed.to_string())
483        })
484    }
485
486    fn flush(&mut self) -> Result<usize> {
487        self.inner
488            .flush()
489            .inspect(|&n| {
490                self.processed += n as u64;
491            })
492            .map_err(|err| {
493                err.with_operation(Operation::Delete)
494                    .with_context("service", self.scheme)
495                    .with_context("deleted", self.processed.to_string())
496            })
497    }
498}