opendal/layers/
correctness_check.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future::Future;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26/// Add a correctness capability check layer for every operation
27///
28/// Before performing any operations, we will first verify the operation and its critical arguments
29/// against the capability of the underlying service. If the operation or arguments is not supported,
30/// an error will be returned directly.
31///
32/// # Notes
33///
34/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually.
35/// this checker ensures the operation and its critical arguments, which might affect the correctness of
36/// the call, are supported by the underlying service.
37///
38/// for example, when calling `write_with_append`, but `append` is not supported by the underlying
39/// service, an `Unsupported` error is returned. without this check, undesired data may be written.
40#[derive(Default)]
41pub struct CorrectnessCheckLayer;
42
43impl<A: Access> Layer<A> for CorrectnessCheckLayer {
44    type LayeredAccess = CorrectnessAccessor<A>;
45
46    fn layer(&self, inner: A) -> Self::LayeredAccess {
47        CorrectnessAccessor {
48            info: inner.info(),
49            inner,
50        }
51    }
52}
53
54pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
55    let scheme = info.scheme();
56    let op = op.into_static();
57
58    Error::new(
59        ErrorKind::Unsupported,
60        format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
61    )
62    .with_operation(op)
63}
64
65pub struct CorrectnessAccessor<A: Access> {
66    info: Arc<AccessorInfo>,
67    inner: A,
68}
69
70impl<A: Access> Debug for CorrectnessAccessor<A> {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("CorrectnessCheckAccessor")
73            .field("inner", &self.inner)
74            .finish_non_exhaustive()
75    }
76}
77
78impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
79    type Inner = A;
80    type Reader = A::Reader;
81    type Writer = A::Writer;
82    type Lister = A::Lister;
83    type Deleter = CheckWrapper<A::Deleter>;
84
85    fn inner(&self) -> &Self::Inner {
86        &self.inner
87    }
88
89    fn info(&self) -> Arc<AccessorInfo> {
90        self.info.clone()
91    }
92
93    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94        let capability = self.info.full_capability();
95        if !capability.read_with_version && args.version().is_some() {
96            return Err(new_unsupported_error(
97                self.info.as_ref(),
98                Operation::Read,
99                "version",
100            ));
101        }
102        if !capability.read_with_if_match && args.if_match().is_some() {
103            return Err(new_unsupported_error(
104                self.info.as_ref(),
105                Operation::Read,
106                "if_match",
107            ));
108        }
109        if !capability.read_with_if_none_match && args.if_none_match().is_some() {
110            return Err(new_unsupported_error(
111                self.info.as_ref(),
112                Operation::Read,
113                "if_none_match",
114            ));
115        }
116        if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
117            return Err(new_unsupported_error(
118                self.info.as_ref(),
119                Operation::Read,
120                "if_modified_since",
121            ));
122        }
123        if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
124            return Err(new_unsupported_error(
125                self.info.as_ref(),
126                Operation::Read,
127                "if_unmodified_since",
128            ));
129        }
130
131        self.inner.read(path, args).await
132    }
133
134    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
135        let capability = self.info.full_capability();
136        if args.append() && !capability.write_can_append {
137            return Err(new_unsupported_error(
138                &self.info,
139                Operation::Write,
140                "append",
141            ));
142        }
143        if args.if_not_exists() && !capability.write_with_if_not_exists {
144            return Err(new_unsupported_error(
145                &self.info,
146                Operation::Write,
147                "if_not_exists",
148            ));
149        }
150        if let Some(if_none_match) = args.if_none_match() {
151            if !capability.write_with_if_none_match {
152                let mut err =
153                    new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
154                if if_none_match == "*" && capability.write_with_if_not_exists {
155                    err = err.with_context("hint", "use if_not_exists instead");
156                }
157
158                return Err(err);
159            }
160        }
161
162        self.inner.write(path, args).await
163    }
164
165    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
166        let capability = self.info.full_capability();
167        if !capability.stat_with_version && args.version().is_some() {
168            return Err(new_unsupported_error(
169                self.info.as_ref(),
170                Operation::Stat,
171                "version",
172            ));
173        }
174        if !capability.stat_with_if_match && args.if_match().is_some() {
175            return Err(new_unsupported_error(
176                self.info.as_ref(),
177                Operation::Stat,
178                "if_match",
179            ));
180        }
181        if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
182            return Err(new_unsupported_error(
183                self.info.as_ref(),
184                Operation::Stat,
185                "if_none_match",
186            ));
187        }
188        if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
189            return Err(new_unsupported_error(
190                self.info.as_ref(),
191                Operation::Stat,
192                "if_modified_since",
193            ));
194        }
195        if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
196            return Err(new_unsupported_error(
197                self.info.as_ref(),
198                Operation::Stat,
199                "if_unmodified_since",
200            ));
201        }
202
203        self.inner.stat(path, args).await
204    }
205
206    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207        self.inner.delete().await.map(|(rp, deleter)| {
208            let deleter = CheckWrapper::new(deleter, self.info.clone());
209            (rp, deleter)
210        })
211    }
212
213    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
214        self.inner.list(path, args).await
215    }
216}
217
218pub struct CheckWrapper<T> {
219    info: Arc<AccessorInfo>,
220    inner: T,
221}
222
223impl<T> CheckWrapper<T> {
224    fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
225        Self { inner, info }
226    }
227
228    fn check_delete(&self, args: &OpDelete) -> Result<()> {
229        if args.version().is_some() && !self.info.full_capability().delete_with_version {
230            return Err(new_unsupported_error(
231                &self.info,
232                Operation::Delete,
233                "version",
234            ));
235        }
236
237        Ok(())
238    }
239}
240
241impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
242    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
243        self.check_delete(&args)?;
244        self.inner.delete(path, args)
245    }
246
247    fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
248        self.inner.flush()
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use crate::raw::oio;
256    use crate::Capability;
257    use crate::EntryMode;
258    use crate::Metadata;
259    use crate::Operator;
260
261    #[derive(Debug)]
262    struct MockService {
263        capability: Capability,
264    }
265
266    impl Access for MockService {
267        type Reader = oio::Reader;
268        type Writer = oio::Writer;
269        type Lister = oio::Lister;
270        type Deleter = oio::Deleter;
271
272        fn info(&self) -> Arc<AccessorInfo> {
273            let info = AccessorInfo::default();
274            info.set_native_capability(self.capability);
275
276            info.into()
277        }
278
279        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
280            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
281        }
282
283        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
284            Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
285        }
286
287        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
288            Ok((RpWrite::new(), Box::new(MockWriter)))
289        }
290
291        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
292            Ok((RpList::default(), Box::new(())))
293        }
294
295        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
296            Ok((RpDelete::default(), Box::new(MockDeleter)))
297        }
298    }
299
300    struct MockWriter;
301
302    impl oio::Write for MockWriter {
303        async fn write(&mut self, _: Buffer) -> Result<()> {
304            Ok(())
305        }
306
307        async fn close(&mut self) -> Result<Metadata> {
308            Ok(Metadata::default())
309        }
310
311        async fn abort(&mut self) -> Result<()> {
312            Ok(())
313        }
314    }
315
316    struct MockDeleter;
317
318    impl oio::Delete for MockDeleter {
319        fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
320            Ok(())
321        }
322
323        async fn flush(&mut self) -> Result<usize> {
324            Ok(1)
325        }
326    }
327
328    fn new_test_operator(capability: Capability) -> Operator {
329        let srv = MockService { capability };
330
331        Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
332    }
333
334    #[tokio::test]
335    async fn test_read() {
336        let op = new_test_operator(Capability {
337            read: true,
338            ..Default::default()
339        });
340        let res = op.read_with("path").version("version").await;
341        assert!(res.is_err());
342        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
343
344        let op = new_test_operator(Capability {
345            read: true,
346            read_with_version: true,
347            ..Default::default()
348        });
349        let res = op.read_with("path").version("version").await;
350        assert!(res.is_ok());
351    }
352
353    #[tokio::test]
354    async fn test_stat() {
355        let op = new_test_operator(Capability {
356            stat: true,
357            ..Default::default()
358        });
359        let res = op.stat_with("path").version("version").await;
360        assert!(res.is_err());
361        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
362
363        let op = new_test_operator(Capability {
364            stat: true,
365            stat_with_version: true,
366            ..Default::default()
367        });
368        let res = op.stat_with("path").version("version").await;
369        assert!(res.is_ok());
370    }
371
372    #[tokio::test]
373    async fn test_write_with() {
374        let op = new_test_operator(Capability {
375            write: true,
376            write_with_if_not_exists: true,
377            ..Default::default()
378        });
379        let res = op.write_with("path", "".as_bytes()).append(true).await;
380        assert!(res.is_err());
381        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
382
383        let res = op
384            .write_with("path", "".as_bytes())
385            .if_none_match("etag")
386            .await;
387        assert!(res.is_err());
388        assert_eq!(
389            res.unwrap_err().to_string(),
390            "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
391        );
392
393        // Now try a wildcard if-none-match
394        let res = op
395            .write_with("path", "".as_bytes())
396            .if_none_match("*")
397            .await;
398        assert!(res.is_err());
399        assert_eq!(
400            res.unwrap_err().to_string(),
401             "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
402        );
403
404        let res = op
405            .write_with("path", "".as_bytes())
406            .if_not_exists(true)
407            .await;
408        assert!(res.is_ok());
409
410        let op = new_test_operator(Capability {
411            write: true,
412            write_can_append: true,
413            write_with_if_not_exists: true,
414            write_with_if_none_match: true,
415            ..Default::default()
416        });
417        let res = op.writer_with("path").append(true).await;
418        assert!(res.is_ok());
419    }
420
421    #[tokio::test]
422    async fn test_delete() {
423        let op = new_test_operator(Capability {
424            delete: true,
425            ..Default::default()
426        });
427        let res = op.delete_with("path").version("version").await;
428        assert!(res.is_err());
429        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
430
431        let op = new_test_operator(Capability {
432            delete: true,
433            delete_with_version: true,
434            ..Default::default()
435        });
436        let res = op.delete_with("path").version("version").await;
437        assert!(res.is_ok())
438    }
439}