opendal/layers/
correctness_check.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{Debug, Formatter};
19use std::future::Future;
20use std::sync::Arc;
21
22use crate::raw::*;
23use crate::*;
24
25/// Add a correctness capability check layer for every operation
26///
27/// Before performing any operations, we will first verify the operation and its critical arguments
28/// against the capability of the underlying service. If the operation or arguments is not supported,
29/// an error will be returned directly.
30///
31/// # Notes
32///
33/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually.
34/// this checker ensures the operation and its critical arguments, which might affect the correctness of
35/// the call, are supported by the underlying service.
36///
37/// for example, when calling `write_with_append`, but `append` is not supported by the underlying
38/// service, an `Unsupported` error is returned. without this check, undesired data may be written.
39#[derive(Default)]
40pub struct CorrectnessCheckLayer;
41
42impl<A: Access> Layer<A> for CorrectnessCheckLayer {
43    type LayeredAccess = CorrectnessAccessor<A>;
44
45    fn layer(&self, inner: A) -> Self::LayeredAccess {
46        CorrectnessAccessor {
47            info: inner.info(),
48            inner,
49        }
50    }
51}
52
53pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
54    let scheme = info.scheme();
55    let op = op.into_static();
56
57    Error::new(
58        ErrorKind::Unsupported,
59        format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
60    )
61    .with_operation(op)
62}
63
64pub struct CorrectnessAccessor<A: Access> {
65    info: Arc<AccessorInfo>,
66    inner: A,
67}
68
69impl<A: Access> Debug for CorrectnessAccessor<A> {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("CorrectnessCheckAccessor")
72            .field("inner", &self.inner)
73            .finish_non_exhaustive()
74    }
75}
76
77impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
78    type Inner = A;
79    type Reader = A::Reader;
80    type Writer = A::Writer;
81    type Lister = A::Lister;
82    type Deleter = CheckWrapper<A::Deleter>;
83    type BlockingReader = A::BlockingReader;
84    type BlockingWriter = A::BlockingWriter;
85    type BlockingLister = A::BlockingLister;
86    type BlockingDeleter = CheckWrapper<A::BlockingDeleter>;
87
88    fn inner(&self) -> &Self::Inner {
89        &self.inner
90    }
91
92    fn info(&self) -> Arc<AccessorInfo> {
93        self.info.clone()
94    }
95
96    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
97        let capability = self.info.full_capability();
98        if !capability.read_with_version && args.version().is_some() {
99            return Err(new_unsupported_error(
100                self.info.as_ref(),
101                Operation::Read,
102                "version",
103            ));
104        }
105        if !capability.read_with_if_match && args.if_match().is_some() {
106            return Err(new_unsupported_error(
107                self.info.as_ref(),
108                Operation::Read,
109                "if_match",
110            ));
111        }
112        if !capability.read_with_if_none_match && args.if_none_match().is_some() {
113            return Err(new_unsupported_error(
114                self.info.as_ref(),
115                Operation::Read,
116                "if_none_match",
117            ));
118        }
119        if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
120            return Err(new_unsupported_error(
121                self.info.as_ref(),
122                Operation::Read,
123                "if_modified_since",
124            ));
125        }
126        if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
127            return Err(new_unsupported_error(
128                self.info.as_ref(),
129                Operation::Read,
130                "if_unmodified_since",
131            ));
132        }
133
134        self.inner.read(path, args).await
135    }
136
137    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
138        let capability = self.info.full_capability();
139        if args.append() && !capability.write_can_append {
140            return Err(new_unsupported_error(
141                &self.info,
142                Operation::Write,
143                "append",
144            ));
145        }
146        if args.if_not_exists() && !capability.write_with_if_not_exists {
147            return Err(new_unsupported_error(
148                &self.info,
149                Operation::Write,
150                "if_not_exists",
151            ));
152        }
153        if let Some(if_none_match) = args.if_none_match() {
154            if !capability.write_with_if_none_match {
155                let mut err =
156                    new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
157                if if_none_match == "*" && capability.write_with_if_not_exists {
158                    err = err.with_context("hint", "use if_not_exists instead");
159                }
160
161                return Err(err);
162            }
163        }
164
165        self.inner.write(path, args).await
166    }
167
168    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
169        let capability = self.info.full_capability();
170        if !capability.stat_with_version && args.version().is_some() {
171            return Err(new_unsupported_error(
172                self.info.as_ref(),
173                Operation::Stat,
174                "version",
175            ));
176        }
177        if !capability.stat_with_if_match && args.if_match().is_some() {
178            return Err(new_unsupported_error(
179                self.info.as_ref(),
180                Operation::Stat,
181                "if_match",
182            ));
183        }
184        if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
185            return Err(new_unsupported_error(
186                self.info.as_ref(),
187                Operation::Stat,
188                "if_none_match",
189            ));
190        }
191        if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
192            return Err(new_unsupported_error(
193                self.info.as_ref(),
194                Operation::Stat,
195                "if_modified_since",
196            ));
197        }
198        if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
199            return Err(new_unsupported_error(
200                self.info.as_ref(),
201                Operation::Stat,
202                "if_unmodified_since",
203            ));
204        }
205
206        self.inner.stat(path, args).await
207    }
208
209    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
210        self.inner.delete().await.map(|(rp, deleter)| {
211            let deleter = CheckWrapper::new(deleter, self.info.clone());
212            (rp, deleter)
213        })
214    }
215
216    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
217        self.inner.list(path, args).await
218    }
219
220    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
221        let capability = self.info.full_capability();
222        if !capability.read_with_version && args.version().is_some() {
223            return Err(new_unsupported_error(
224                self.info.as_ref(),
225                Operation::Read,
226                "version",
227            ));
228        }
229
230        self.inner.blocking_read(path, args)
231    }
232
233    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
234        let capability = self.info.full_capability();
235        if args.append() && !capability.write_can_append {
236            return Err(new_unsupported_error(
237                &self.info,
238                Operation::Write,
239                "append",
240            ));
241        }
242        if args.if_not_exists() && !capability.write_with_if_not_exists {
243            return Err(new_unsupported_error(
244                &self.info,
245                Operation::Write,
246                "if_not_exists",
247            ));
248        }
249        if args.if_none_match().is_some() && !capability.write_with_if_none_match {
250            return Err(new_unsupported_error(
251                self.info.as_ref(),
252                Operation::Write,
253                "if_none_match",
254            ));
255        }
256
257        self.inner.blocking_write(path, args)
258    }
259
260    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
261        let capability = self.info.full_capability();
262        if !capability.stat_with_version && args.version().is_some() {
263            return Err(new_unsupported_error(
264                self.info.as_ref(),
265                Operation::Stat,
266                "version",
267            ));
268        }
269
270        self.inner.blocking_stat(path, args)
271    }
272
273    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
274        self.inner.blocking_delete().map(|(rp, deleter)| {
275            let deleter = CheckWrapper::new(deleter, self.info.clone());
276            (rp, deleter)
277        })
278    }
279
280    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
281        self.inner.blocking_list(path, args)
282    }
283}
284
285pub struct CheckWrapper<T> {
286    info: Arc<AccessorInfo>,
287    inner: T,
288}
289
290impl<T> CheckWrapper<T> {
291    fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
292        Self { inner, info }
293    }
294
295    fn check_delete(&self, args: &OpDelete) -> Result<()> {
296        if args.version().is_some() && !self.info.full_capability().delete_with_version {
297            return Err(new_unsupported_error(
298                &self.info,
299                Operation::Delete,
300                "version",
301            ));
302        }
303
304        Ok(())
305    }
306}
307
308impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
309    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
310        self.check_delete(&args)?;
311        self.inner.delete(path, args)
312    }
313
314    fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
315        self.inner.flush()
316    }
317}
318
319impl<T: oio::BlockingDelete> oio::BlockingDelete for CheckWrapper<T> {
320    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
321        self.check_delete(&args)?;
322        self.inner.delete(path, args)
323    }
324
325    fn flush(&mut self) -> Result<usize> {
326        self.inner.flush()
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::raw::oio;
334    use crate::{Capability, EntryMode, Metadata, Operator};
335
336    #[derive(Debug)]
337    struct MockService {
338        capability: Capability,
339    }
340
341    impl Access for MockService {
342        type Reader = oio::Reader;
343        type Writer = oio::Writer;
344        type Lister = oio::Lister;
345        type Deleter = oio::Deleter;
346        type BlockingReader = oio::BlockingReader;
347        type BlockingWriter = oio::BlockingWriter;
348        type BlockingLister = oio::BlockingLister;
349        type BlockingDeleter = oio::BlockingDeleter;
350
351        fn info(&self) -> Arc<AccessorInfo> {
352            let info = AccessorInfo::default();
353            info.set_native_capability(self.capability);
354
355            info.into()
356        }
357
358        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
359            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
360        }
361
362        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
363            Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
364        }
365
366        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
367            Ok((RpWrite::new(), Box::new(MockWriter)))
368        }
369
370        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
371            Ok((RpList::default(), Box::new(())))
372        }
373
374        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
375            Ok((RpDelete::default(), Box::new(MockDeleter)))
376        }
377    }
378
379    struct MockWriter;
380
381    impl oio::Write for MockWriter {
382        async fn write(&mut self, _: Buffer) -> Result<()> {
383            Ok(())
384        }
385
386        async fn close(&mut self) -> Result<Metadata> {
387            Ok(Metadata::default())
388        }
389
390        async fn abort(&mut self) -> Result<()> {
391            Ok(())
392        }
393    }
394
395    struct MockDeleter;
396
397    impl oio::Delete for MockDeleter {
398        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
399            Ok(())
400        }
401
402        async fn flush(&mut self) -> Result<usize> {
403            Ok(1)
404        }
405    }
406
407    fn new_test_operator(capability: Capability) -> Operator {
408        let srv = MockService { capability };
409
410        Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
411    }
412
413    #[tokio::test]
414    async fn test_read() {
415        let op = new_test_operator(Capability {
416            read: true,
417            ..Default::default()
418        });
419        let res = op.read_with("path").version("version").await;
420        assert!(res.is_err());
421        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
422
423        let op = new_test_operator(Capability {
424            read: true,
425            read_with_version: true,
426            ..Default::default()
427        });
428        let res = op.read_with("path").version("version").await;
429        assert!(res.is_ok());
430    }
431
432    #[tokio::test]
433    async fn test_stat() {
434        let op = new_test_operator(Capability {
435            stat: true,
436            ..Default::default()
437        });
438        let res = op.stat_with("path").version("version").await;
439        assert!(res.is_err());
440        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
441
442        let op = new_test_operator(Capability {
443            stat: true,
444            stat_with_version: true,
445            ..Default::default()
446        });
447        let res = op.stat_with("path").version("version").await;
448        assert!(res.is_ok());
449    }
450
451    #[tokio::test]
452    async fn test_write_with() {
453        let op = new_test_operator(Capability {
454            write: true,
455            write_with_if_not_exists: true,
456            ..Default::default()
457        });
458        let res = op.write_with("path", "".as_bytes()).append(true).await;
459        assert!(res.is_err());
460        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
461
462        let res = op
463            .write_with("path", "".as_bytes())
464            .if_none_match("etag")
465            .await;
466        assert!(res.is_err());
467        assert_eq!(
468            res.unwrap_err().to_string(),
469            "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
470        );
471
472        // Now try a wildcard if-none-match
473        let res = op
474            .write_with("path", "".as_bytes())
475            .if_none_match("*")
476            .await;
477        assert!(res.is_err());
478        assert_eq!(
479            res.unwrap_err().to_string(),
480             "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
481        );
482
483        let res = op
484            .write_with("path", "".as_bytes())
485            .if_not_exists(true)
486            .await;
487        assert!(res.is_ok());
488
489        let op = new_test_operator(Capability {
490            write: true,
491            write_can_append: true,
492            write_with_if_not_exists: true,
493            write_with_if_none_match: true,
494            ..Default::default()
495        });
496        let res = op.writer_with("path").append(true).await;
497        assert!(res.is_ok());
498    }
499
500    #[tokio::test]
501    async fn test_delete() {
502        let op = new_test_operator(Capability {
503            delete: true,
504            ..Default::default()
505        });
506        let res = op.delete_with("path").version("version").await;
507        assert!(res.is_err());
508        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
509
510        let op = new_test_operator(Capability {
511            delete: true,
512            delete_with_version: true,
513            ..Default::default()
514        });
515        let res = op.delete_with("path").version("version").await;
516        assert!(res.is_ok())
517    }
518}