opendal_core/layers/
correctness_check.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future::Future;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26/// Add a correctness capability check layer for every operation
27///
28/// Before performing any operations, we will first verify the operation and its critical arguments
29/// against the capability of the underlying service. If the operation or arguments is not supported,
30/// an error will be returned directly.
31///
32/// # Notes
33///
34/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually.
35/// this checker ensures the operation and its critical arguments, which might affect the correctness of
36/// the call, are supported by the underlying service.
37///
38/// for example, when calling `write_with_append`, but `append` is not supported by the underlying
39/// service, an `Unsupported` error is returned. without this check, undesired data may be written.
40#[derive(Default)]
41pub struct CorrectnessCheckLayer;
42
43impl<A: Access> Layer<A> for CorrectnessCheckLayer {
44    type LayeredAccess = CorrectnessAccessor<A>;
45
46    fn layer(&self, inner: A) -> Self::LayeredAccess {
47        CorrectnessAccessor {
48            info: inner.info(),
49            inner,
50        }
51    }
52}
53
54pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
55    let scheme = info.scheme();
56    let op = op.into_static();
57
58    Error::new(
59        ErrorKind::Unsupported,
60        format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
61    )
62    .with_operation(op)
63}
64
65pub struct CorrectnessAccessor<A: Access> {
66    info: Arc<AccessorInfo>,
67    inner: A,
68}
69
70impl<A: Access> Debug for CorrectnessAccessor<A> {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("CorrectnessCheckAccessor")
73            .field("inner", &self.inner)
74            .finish_non_exhaustive()
75    }
76}
77
78impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
79    type Inner = A;
80    type Reader = A::Reader;
81    type Writer = A::Writer;
82    type Lister = A::Lister;
83    type Deleter = CheckWrapper<A::Deleter>;
84
85    fn inner(&self) -> &Self::Inner {
86        &self.inner
87    }
88
89    fn info(&self) -> Arc<AccessorInfo> {
90        self.info.clone()
91    }
92
93    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94        let capability = self.info.full_capability();
95        if !capability.read_with_version && args.version().is_some() {
96            return Err(new_unsupported_error(
97                self.info.as_ref(),
98                Operation::Read,
99                "version",
100            ));
101        }
102        if !capability.read_with_if_match && args.if_match().is_some() {
103            return Err(new_unsupported_error(
104                self.info.as_ref(),
105                Operation::Read,
106                "if_match",
107            ));
108        }
109        if !capability.read_with_if_none_match && args.if_none_match().is_some() {
110            return Err(new_unsupported_error(
111                self.info.as_ref(),
112                Operation::Read,
113                "if_none_match",
114            ));
115        }
116        if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
117            return Err(new_unsupported_error(
118                self.info.as_ref(),
119                Operation::Read,
120                "if_modified_since",
121            ));
122        }
123        if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
124            return Err(new_unsupported_error(
125                self.info.as_ref(),
126                Operation::Read,
127                "if_unmodified_since",
128            ));
129        }
130
131        self.inner.read(path, args).await
132    }
133
134    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
135        let capability = self.info.full_capability();
136        if args.append() && !capability.write_can_append {
137            return Err(new_unsupported_error(
138                &self.info,
139                Operation::Write,
140                "append",
141            ));
142        }
143        if args.if_not_exists() && !capability.write_with_if_not_exists {
144            return Err(new_unsupported_error(
145                &self.info,
146                Operation::Write,
147                "if_not_exists",
148            ));
149        }
150        if let Some(if_none_match) = args.if_none_match() {
151            if !capability.write_with_if_none_match {
152                let mut err =
153                    new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
154                if if_none_match == "*" && capability.write_with_if_not_exists {
155                    err = err.with_context("hint", "use if_not_exists instead");
156                }
157
158                return Err(err);
159            }
160        }
161
162        self.inner.write(path, args).await
163    }
164
165    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
166        let capability = self.info.full_capability();
167        if !capability.stat_with_version && args.version().is_some() {
168            return Err(new_unsupported_error(
169                self.info.as_ref(),
170                Operation::Stat,
171                "version",
172            ));
173        }
174        if !capability.stat_with_if_match && args.if_match().is_some() {
175            return Err(new_unsupported_error(
176                self.info.as_ref(),
177                Operation::Stat,
178                "if_match",
179            ));
180        }
181        if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
182            return Err(new_unsupported_error(
183                self.info.as_ref(),
184                Operation::Stat,
185                "if_none_match",
186            ));
187        }
188        if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
189            return Err(new_unsupported_error(
190                self.info.as_ref(),
191                Operation::Stat,
192                "if_modified_since",
193            ));
194        }
195        if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
196            return Err(new_unsupported_error(
197                self.info.as_ref(),
198                Operation::Stat,
199                "if_unmodified_since",
200            ));
201        }
202
203        self.inner.stat(path, args).await
204    }
205
206    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207        self.inner.delete().await.map(|(rp, deleter)| {
208            let deleter = CheckWrapper::new(deleter, self.info.clone());
209            (rp, deleter)
210        })
211    }
212
213    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
214        self.inner.list(path, args).await
215    }
216}
217
218pub struct CheckWrapper<T> {
219    info: Arc<AccessorInfo>,
220    inner: T,
221}
222
223impl<T> CheckWrapper<T> {
224    fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
225        Self { inner, info }
226    }
227
228    fn check_delete(&self, args: &OpDelete) -> Result<()> {
229        if args.version().is_some() && !self.info.full_capability().delete_with_version {
230            return Err(new_unsupported_error(
231                &self.info,
232                Operation::Delete,
233                "version",
234            ));
235        }
236
237        if args.recursive() && !self.info.full_capability().delete_with_recursive {
238            return Err(new_unsupported_error(
239                &self.info,
240                Operation::Delete,
241                "recursive",
242            ));
243        }
244
245        Ok(())
246    }
247}
248
249impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
250    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
251        self.check_delete(&args)?;
252        self.inner.delete(path, args).await
253    }
254
255    fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
256        self.inner.close()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::Capability;
264    use crate::EntryMode;
265    use crate::Metadata;
266    use crate::Operator;
267    use crate::raw::oio;
268
269    #[derive(Debug)]
270    struct MockService {
271        capability: Capability,
272    }
273
274    impl Access for MockService {
275        type Reader = oio::Reader;
276        type Writer = oio::Writer;
277        type Lister = oio::Lister;
278        type Deleter = oio::Deleter;
279
280        fn info(&self) -> Arc<AccessorInfo> {
281            let info = AccessorInfo::default();
282            info.set_scheme("memory");
283            info.set_native_capability(self.capability);
284
285            info.into()
286        }
287
288        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
289            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
290        }
291
292        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
293            Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
294        }
295
296        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
297            Ok((RpWrite::new(), Box::new(MockWriter)))
298        }
299
300        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
301            Ok((RpList::default(), Box::new(())))
302        }
303
304        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
305            Ok((RpDelete::default(), Box::new(MockDeleter)))
306        }
307    }
308
309    struct MockWriter;
310
311    impl oio::Write for MockWriter {
312        async fn write(&mut self, _: Buffer) -> Result<()> {
313            Ok(())
314        }
315
316        async fn close(&mut self) -> Result<Metadata> {
317            Ok(Metadata::default())
318        }
319
320        async fn abort(&mut self) -> Result<()> {
321            Ok(())
322        }
323    }
324
325    struct MockDeleter;
326
327    impl oio::Delete for MockDeleter {
328        async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
329            Ok(())
330        }
331
332        async fn close(&mut self) -> Result<()> {
333            Ok(())
334        }
335    }
336
337    fn new_test_operator(capability: Capability) -> Operator {
338        let srv = MockService { capability };
339
340        Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
341    }
342
343    #[tokio::test]
344    async fn test_read() {
345        let op = new_test_operator(Capability {
346            read: true,
347            ..Default::default()
348        });
349        let res = op.read_with("path").version("version").await;
350        assert!(res.is_err());
351        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
352
353        let op = new_test_operator(Capability {
354            read: true,
355            read_with_version: true,
356            ..Default::default()
357        });
358        let res = op.read_with("path").version("version").await;
359        assert!(res.is_ok());
360    }
361
362    #[tokio::test]
363    async fn test_stat() {
364        let op = new_test_operator(Capability {
365            stat: true,
366            ..Default::default()
367        });
368        let res = op.stat_with("path").version("version").await;
369        assert!(res.is_err());
370        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
371
372        let op = new_test_operator(Capability {
373            stat: true,
374            stat_with_version: true,
375            ..Default::default()
376        });
377        let res = op.stat_with("path").version("version").await;
378        assert!(res.is_ok());
379    }
380
381    #[tokio::test]
382    async fn test_write_with() {
383        let op = new_test_operator(Capability {
384            write: true,
385            write_with_if_not_exists: true,
386            ..Default::default()
387        });
388        let res = op.write_with("path", "".as_bytes()).append(true).await;
389        assert!(res.is_err());
390        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
391
392        let res = op
393            .write_with("path", "".as_bytes())
394            .if_none_match("etag")
395            .await;
396        assert!(res.is_err());
397        assert_eq!(
398            res.unwrap_err().to_string(),
399            "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
400        );
401
402        // Now try a wildcard if-none-match
403        let res = op
404            .write_with("path", "".as_bytes())
405            .if_none_match("*")
406            .await;
407        assert!(res.is_err());
408        assert_eq!(
409            res.unwrap_err().to_string(),
410            "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
411        );
412
413        let res = op
414            .write_with("path", "".as_bytes())
415            .if_not_exists(true)
416            .await;
417        assert!(res.is_ok());
418
419        let op = new_test_operator(Capability {
420            write: true,
421            write_can_append: true,
422            write_with_if_not_exists: true,
423            write_with_if_none_match: true,
424            ..Default::default()
425        });
426        let res = op.writer_with("path").append(true).await;
427        assert!(res.is_ok());
428    }
429
430    #[tokio::test]
431    async fn test_delete() {
432        let op = new_test_operator(Capability {
433            delete: true,
434            ..Default::default()
435        });
436        let res = op.delete_with("path").version("version").await;
437        assert!(res.is_err());
438        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
439
440        let op = new_test_operator(Capability {
441            delete: true,
442            delete_with_version: true,
443            ..Default::default()
444        });
445        let res = op.delete_with("path").version("version").await;
446        assert!(res.is_ok())
447    }
448}