opendal/layers/
correctness_check.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future::Future;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26/// Add a correctness capability check layer for every operation
27///
28/// Before performing any operations, we will first verify the operation and its critical arguments
29/// against the capability of the underlying service. If the operation or arguments is not supported,
30/// an error will be returned directly.
31///
32/// # Notes
33///
34/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually.
35/// this checker ensures the operation and its critical arguments, which might affect the correctness of
36/// the call, are supported by the underlying service.
37///
38/// for example, when calling `write_with_append`, but `append` is not supported by the underlying
39/// service, an `Unsupported` error is returned. without this check, undesired data may be written.
40#[derive(Default)]
41pub struct CorrectnessCheckLayer;
42
43impl<A: Access> Layer<A> for CorrectnessCheckLayer {
44    type LayeredAccess = CorrectnessAccessor<A>;
45
46    fn layer(&self, inner: A) -> Self::LayeredAccess {
47        CorrectnessAccessor {
48            info: inner.info(),
49            inner,
50        }
51    }
52}
53
54pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
55    let scheme = info.scheme();
56    let op = op.into_static();
57
58    Error::new(
59        ErrorKind::Unsupported,
60        format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
61    )
62    .with_operation(op)
63}
64
65pub struct CorrectnessAccessor<A: Access> {
66    info: Arc<AccessorInfo>,
67    inner: A,
68}
69
70impl<A: Access> Debug for CorrectnessAccessor<A> {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("CorrectnessCheckAccessor")
73            .field("inner", &self.inner)
74            .finish_non_exhaustive()
75    }
76}
77
78impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
79    type Inner = A;
80    type Reader = A::Reader;
81    type Writer = A::Writer;
82    type Lister = A::Lister;
83    type Deleter = CheckWrapper<A::Deleter>;
84
85    fn inner(&self) -> &Self::Inner {
86        &self.inner
87    }
88
89    fn info(&self) -> Arc<AccessorInfo> {
90        self.info.clone()
91    }
92
93    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94        let capability = self.info.full_capability();
95        if !capability.read_with_version && args.version().is_some() {
96            return Err(new_unsupported_error(
97                self.info.as_ref(),
98                Operation::Read,
99                "version",
100            ));
101        }
102        if !capability.read_with_if_match && args.if_match().is_some() {
103            return Err(new_unsupported_error(
104                self.info.as_ref(),
105                Operation::Read,
106                "if_match",
107            ));
108        }
109        if !capability.read_with_if_none_match && args.if_none_match().is_some() {
110            return Err(new_unsupported_error(
111                self.info.as_ref(),
112                Operation::Read,
113                "if_none_match",
114            ));
115        }
116        if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
117            return Err(new_unsupported_error(
118                self.info.as_ref(),
119                Operation::Read,
120                "if_modified_since",
121            ));
122        }
123        if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
124            return Err(new_unsupported_error(
125                self.info.as_ref(),
126                Operation::Read,
127                "if_unmodified_since",
128            ));
129        }
130
131        self.inner.read(path, args).await
132    }
133
134    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
135        let capability = self.info.full_capability();
136        if args.append() && !capability.write_can_append {
137            return Err(new_unsupported_error(
138                &self.info,
139                Operation::Write,
140                "append",
141            ));
142        }
143        if args.if_not_exists() && !capability.write_with_if_not_exists {
144            return Err(new_unsupported_error(
145                &self.info,
146                Operation::Write,
147                "if_not_exists",
148            ));
149        }
150        if let Some(if_none_match) = args.if_none_match() {
151            if !capability.write_with_if_none_match {
152                let mut err =
153                    new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
154                if if_none_match == "*" && capability.write_with_if_not_exists {
155                    err = err.with_context("hint", "use if_not_exists instead");
156                }
157
158                return Err(err);
159            }
160        }
161
162        self.inner.write(path, args).await
163    }
164
165    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
166        let capability = self.info.full_capability();
167        if !capability.stat_with_version && args.version().is_some() {
168            return Err(new_unsupported_error(
169                self.info.as_ref(),
170                Operation::Stat,
171                "version",
172            ));
173        }
174        if !capability.stat_with_if_match && args.if_match().is_some() {
175            return Err(new_unsupported_error(
176                self.info.as_ref(),
177                Operation::Stat,
178                "if_match",
179            ));
180        }
181        if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
182            return Err(new_unsupported_error(
183                self.info.as_ref(),
184                Operation::Stat,
185                "if_none_match",
186            ));
187        }
188        if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
189            return Err(new_unsupported_error(
190                self.info.as_ref(),
191                Operation::Stat,
192                "if_modified_since",
193            ));
194        }
195        if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
196            return Err(new_unsupported_error(
197                self.info.as_ref(),
198                Operation::Stat,
199                "if_unmodified_since",
200            ));
201        }
202
203        self.inner.stat(path, args).await
204    }
205
206    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207        self.inner.delete().await.map(|(rp, deleter)| {
208            let deleter = CheckWrapper::new(deleter, self.info.clone());
209            (rp, deleter)
210        })
211    }
212
213    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
214        self.inner.list(path, args).await
215    }
216}
217
218pub struct CheckWrapper<T> {
219    info: Arc<AccessorInfo>,
220    inner: T,
221}
222
223impl<T> CheckWrapper<T> {
224    fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
225        Self { inner, info }
226    }
227
228    fn check_delete(&self, args: &OpDelete) -> Result<()> {
229        if args.version().is_some() && !self.info.full_capability().delete_with_version {
230            return Err(new_unsupported_error(
231                &self.info,
232                Operation::Delete,
233                "version",
234            ));
235        }
236
237        Ok(())
238    }
239}
240
241impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
242    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
243        self.check_delete(&args)?;
244        self.inner.delete(path, args)
245    }
246
247    fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
248        self.inner.flush()
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use crate::raw::oio;
256    use crate::Capability;
257    use crate::EntryMode;
258    use crate::Metadata;
259    use crate::Operator;
260
261    #[derive(Debug)]
262    struct MockService {
263        capability: Capability,
264    }
265
266    impl Access for MockService {
267        type Reader = oio::Reader;
268        type Writer = oio::Writer;
269        type Lister = oio::Lister;
270        type Deleter = oio::Deleter;
271
272        fn info(&self) -> Arc<AccessorInfo> {
273            let info = AccessorInfo::default();
274            info.set_scheme("memory");
275            info.set_native_capability(self.capability);
276
277            info.into()
278        }
279
280        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
281            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
282        }
283
284        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
285            Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
286        }
287
288        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
289            Ok((RpWrite::new(), Box::new(MockWriter)))
290        }
291
292        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
293            Ok((RpList::default(), Box::new(())))
294        }
295
296        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
297            Ok((RpDelete::default(), Box::new(MockDeleter)))
298        }
299    }
300
301    struct MockWriter;
302
303    impl oio::Write for MockWriter {
304        async fn write(&mut self, _: Buffer) -> Result<()> {
305            Ok(())
306        }
307
308        async fn close(&mut self) -> Result<Metadata> {
309            Ok(Metadata::default())
310        }
311
312        async fn abort(&mut self) -> Result<()> {
313            Ok(())
314        }
315    }
316
317    struct MockDeleter;
318
319    impl oio::Delete for MockDeleter {
320        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
321            Ok(())
322        }
323
324        async fn flush(&mut self) -> Result<usize> {
325            Ok(1)
326        }
327    }
328
329    fn new_test_operator(capability: Capability) -> Operator {
330        let srv = MockService { capability };
331
332        Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
333    }
334
335    #[tokio::test]
336    async fn test_read() {
337        let op = new_test_operator(Capability {
338            read: true,
339            ..Default::default()
340        });
341        let res = op.read_with("path").version("version").await;
342        assert!(res.is_err());
343        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
344
345        let op = new_test_operator(Capability {
346            read: true,
347            read_with_version: true,
348            ..Default::default()
349        });
350        let res = op.read_with("path").version("version").await;
351        assert!(res.is_ok());
352    }
353
354    #[tokio::test]
355    async fn test_stat() {
356        let op = new_test_operator(Capability {
357            stat: true,
358            ..Default::default()
359        });
360        let res = op.stat_with("path").version("version").await;
361        assert!(res.is_err());
362        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
363
364        let op = new_test_operator(Capability {
365            stat: true,
366            stat_with_version: true,
367            ..Default::default()
368        });
369        let res = op.stat_with("path").version("version").await;
370        assert!(res.is_ok());
371    }
372
373    #[tokio::test]
374    async fn test_write_with() {
375        let op = new_test_operator(Capability {
376            write: true,
377            write_with_if_not_exists: true,
378            ..Default::default()
379        });
380        let res = op.write_with("path", "".as_bytes()).append(true).await;
381        assert!(res.is_err());
382        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
383
384        let res = op
385            .write_with("path", "".as_bytes())
386            .if_none_match("etag")
387            .await;
388        assert!(res.is_err());
389        assert_eq!(
390            res.unwrap_err().to_string(),
391            "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
392        );
393
394        // Now try a wildcard if-none-match
395        let res = op
396            .write_with("path", "".as_bytes())
397            .if_none_match("*")
398            .await;
399        assert!(res.is_err());
400        assert_eq!(
401            res.unwrap_err().to_string(),
402             "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
403        );
404
405        let res = op
406            .write_with("path", "".as_bytes())
407            .if_not_exists(true)
408            .await;
409        assert!(res.is_ok());
410
411        let op = new_test_operator(Capability {
412            write: true,
413            write_can_append: true,
414            write_with_if_not_exists: true,
415            write_with_if_none_match: true,
416            ..Default::default()
417        });
418        let res = op.writer_with("path").append(true).await;
419        assert!(res.is_ok());
420    }
421
422    #[tokio::test]
423    async fn test_delete() {
424        let op = new_test_operator(Capability {
425            delete: true,
426            ..Default::default()
427        });
428        let res = op.delete_with("path").version("version").await;
429        assert!(res.is_err());
430        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
431
432        let op = new_test_operator(Capability {
433            delete: true,
434            delete_with_version: true,
435            ..Default::default()
436        });
437        let res = op.delete_with("path").version("version").await;
438        assert!(res.is_ok())
439    }
440}