opendal_core/layers/
correctness_check.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future::Future;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26/// Add a correctness capability check layer for every operation
27///
28/// Before performing any operations, we will first verify the operation and its critical arguments
29/// against the capability of the underlying service. If the operation or arguments is not supported,
30/// an error will be returned directly.
31///
32/// # Notes
33///
34/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually.
35/// this checker ensures the operation and its critical arguments, which might affect the correctness of
36/// the call, are supported by the underlying service.
37///
38/// for example, when calling `write_with_append`, but `append` is not supported by the underlying
39/// service, an `Unsupported` error is returned. without this check, undesired data may be written.
40#[derive(Default)]
41pub struct CorrectnessCheckLayer;
42
43impl<A: Access> Layer<A> for CorrectnessCheckLayer {
44    type LayeredAccess = CorrectnessAccessor<A>;
45
46    fn layer(&self, inner: A) -> Self::LayeredAccess {
47        CorrectnessAccessor {
48            info: inner.info(),
49            inner,
50        }
51    }
52}
53
54pub struct CorrectnessAccessor<A: Access> {
55    info: Arc<AccessorInfo>,
56    inner: A,
57}
58
59pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
60    let scheme = info.scheme();
61    let op = op.into_static();
62
63    Error::new(
64        ErrorKind::Unsupported,
65        format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
66    )
67    .with_operation(op)
68}
69
70impl<A: Access> Debug for CorrectnessAccessor<A> {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("CorrectnessCheckAccessor")
73            .field("inner", &self.inner)
74            .finish_non_exhaustive()
75    }
76}
77
78impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
79    type Inner = A;
80    type Reader = A::Reader;
81    type Writer = A::Writer;
82    type Lister = A::Lister;
83    type Deleter = CheckWrapper<A::Deleter>;
84
85    fn inner(&self) -> &Self::Inner {
86        &self.inner
87    }
88
89    fn info(&self) -> Arc<AccessorInfo> {
90        self.info.clone()
91    }
92
93    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94        let capability = self.info.full_capability();
95        if !capability.read_with_version && args.version().is_some() {
96            return Err(new_unsupported_error(
97                self.info.as_ref(),
98                Operation::Read,
99                "version",
100            ));
101        }
102        if !capability.read_with_if_match && args.if_match().is_some() {
103            return Err(new_unsupported_error(
104                self.info.as_ref(),
105                Operation::Read,
106                "if_match",
107            ));
108        }
109        if !capability.read_with_if_none_match && args.if_none_match().is_some() {
110            return Err(new_unsupported_error(
111                self.info.as_ref(),
112                Operation::Read,
113                "if_none_match",
114            ));
115        }
116        if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
117            return Err(new_unsupported_error(
118                self.info.as_ref(),
119                Operation::Read,
120                "if_modified_since",
121            ));
122        }
123        if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
124            return Err(new_unsupported_error(
125                self.info.as_ref(),
126                Operation::Read,
127                "if_unmodified_since",
128            ));
129        }
130
131        self.inner.read(path, args).await
132    }
133
134    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
135        let capability = self.info.full_capability();
136        if args.append() && !capability.write_can_append {
137            return Err(new_unsupported_error(
138                &self.info,
139                Operation::Write,
140                "append",
141            ));
142        }
143        if args.if_not_exists() && !capability.write_with_if_not_exists {
144            return Err(new_unsupported_error(
145                &self.info,
146                Operation::Write,
147                "if_not_exists",
148            ));
149        }
150        if args.if_match().is_some() && !capability.write_with_if_match {
151            return Err(new_unsupported_error(
152                &self.info,
153                Operation::Write,
154                "if_match",
155            ));
156        }
157        if let Some(if_none_match) = args.if_none_match() {
158            if !capability.write_with_if_none_match {
159                let mut err =
160                    new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
161                if if_none_match == "*" && capability.write_with_if_not_exists {
162                    err = err.with_context("hint", "use if_not_exists instead");
163                }
164
165                return Err(err);
166            }
167        }
168
169        self.inner.write(path, args).await
170    }
171
172    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
173        let capability = self.info.full_capability();
174        if !capability.stat_with_version && args.version().is_some() {
175            return Err(new_unsupported_error(
176                self.info.as_ref(),
177                Operation::Stat,
178                "version",
179            ));
180        }
181        if !capability.stat_with_if_match && args.if_match().is_some() {
182            return Err(new_unsupported_error(
183                self.info.as_ref(),
184                Operation::Stat,
185                "if_match",
186            ));
187        }
188        if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
189            return Err(new_unsupported_error(
190                self.info.as_ref(),
191                Operation::Stat,
192                "if_none_match",
193            ));
194        }
195        if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
196            return Err(new_unsupported_error(
197                self.info.as_ref(),
198                Operation::Stat,
199                "if_modified_since",
200            ));
201        }
202        if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
203            return Err(new_unsupported_error(
204                self.info.as_ref(),
205                Operation::Stat,
206                "if_unmodified_since",
207            ));
208        }
209
210        self.inner.stat(path, args).await
211    }
212
213    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
214        self.inner.delete().await.map(|(rp, deleter)| {
215            let deleter = CheckWrapper::new(deleter, self.info.clone());
216            (rp, deleter)
217        })
218    }
219
220    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
221        let capability = self.info.full_capability();
222        if args.if_not_exists() && !capability.copy_with_if_not_exists {
223            return Err(new_unsupported_error(
224                &self.info,
225                Operation::Copy,
226                "if_not_exists",
227            ));
228        }
229
230        self.inner.copy(from, to, args).await
231    }
232
233    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
234        self.inner.list(path, args).await
235    }
236}
237
238pub struct CheckWrapper<T> {
239    info: Arc<AccessorInfo>,
240    inner: T,
241}
242
243impl<T> CheckWrapper<T> {
244    fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
245        Self { inner, info }
246    }
247
248    fn check_delete(&self, args: &OpDelete) -> Result<()> {
249        if args.version().is_some() && !self.info.full_capability().delete_with_version {
250            return Err(new_unsupported_error(
251                &self.info,
252                Operation::Delete,
253                "version",
254            ));
255        }
256
257        if args.recursive() && !self.info.full_capability().delete_with_recursive {
258            return Err(new_unsupported_error(
259                &self.info,
260                Operation::Delete,
261                "recursive",
262            ));
263        }
264
265        Ok(())
266    }
267}
268
269impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
270    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
271        self.check_delete(&args)?;
272        self.inner.delete(path, args).await
273    }
274
275    fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
276        self.inner.close()
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::Capability;
284    use crate::EntryMode;
285    use crate::Metadata;
286    use crate::Operator;
287    use crate::raw::oio;
288
289    #[derive(Debug)]
290    struct MockService {
291        capability: Capability,
292    }
293
294    impl Access for MockService {
295        type Reader = oio::Reader;
296        type Writer = oio::Writer;
297        type Lister = oio::Lister;
298        type Deleter = oio::Deleter;
299
300        fn info(&self) -> Arc<AccessorInfo> {
301            let info = AccessorInfo::default();
302            info.set_scheme("memory");
303            info.set_native_capability(self.capability);
304
305            info.into()
306        }
307
308        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
309            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
310        }
311
312        async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
313            Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
314        }
315
316        async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
317            Ok((RpWrite::new(), Box::new(MockWriter)))
318        }
319
320        async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
321            Ok((RpList::default(), Box::new(())))
322        }
323
324        async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
325            Ok((RpDelete::default(), Box::new(MockDeleter)))
326        }
327    }
328
329    struct MockWriter;
330
331    impl oio::Write for MockWriter {
332        async fn write(&mut self, _: Buffer) -> Result<()> {
333            Ok(())
334        }
335
336        async fn close(&mut self) -> Result<Metadata> {
337            Ok(Metadata::default())
338        }
339
340        async fn abort(&mut self) -> Result<()> {
341            Ok(())
342        }
343    }
344
345    struct MockDeleter;
346
347    impl oio::Delete for MockDeleter {
348        async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
349            Ok(())
350        }
351
352        async fn close(&mut self) -> Result<()> {
353            Ok(())
354        }
355    }
356
357    fn new_test_operator(capability: Capability) -> Operator {
358        let srv = MockService { capability };
359
360        Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
361    }
362
363    #[tokio::test]
364    async fn test_read() {
365        let op = new_test_operator(Capability {
366            read: true,
367            ..Default::default()
368        });
369        let res = op.read_with("path").version("version").await;
370        assert!(res.is_err());
371        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
372
373        let op = new_test_operator(Capability {
374            read: true,
375            read_with_version: true,
376            ..Default::default()
377        });
378        let res = op.read_with("path").version("version").await;
379        assert!(res.is_ok());
380    }
381
382    #[tokio::test]
383    async fn test_stat() {
384        let op = new_test_operator(Capability {
385            stat: true,
386            ..Default::default()
387        });
388        let res = op.stat_with("path").version("version").await;
389        assert!(res.is_err());
390        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
391
392        let op = new_test_operator(Capability {
393            stat: true,
394            stat_with_version: true,
395            ..Default::default()
396        });
397        let res = op.stat_with("path").version("version").await;
398        assert!(res.is_ok());
399    }
400
401    #[tokio::test]
402    async fn test_write_with() {
403        let op = new_test_operator(Capability {
404            write: true,
405            write_with_if_not_exists: true,
406            ..Default::default()
407        });
408        let res = op.write_with("path", "".as_bytes()).append(true).await;
409        assert!(res.is_err());
410        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
411
412        let res = op
413            .write_with("path", "".as_bytes())
414            .if_none_match("etag")
415            .await;
416        assert!(res.is_err());
417        assert_eq!(
418            res.unwrap_err().to_string(),
419            "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
420        );
421
422        // Now try a wildcard if-none-match
423        let res = op
424            .write_with("path", "".as_bytes())
425            .if_none_match("*")
426            .await;
427        assert!(res.is_err());
428        assert_eq!(
429            res.unwrap_err().to_string(),
430            "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
431        );
432
433        let res = op
434            .write_with("path", "".as_bytes())
435            .if_not_exists(true)
436            .await;
437        assert!(res.is_ok());
438
439        let op = new_test_operator(Capability {
440            write: true,
441            write_can_append: true,
442            write_with_if_not_exists: true,
443            write_with_if_none_match: true,
444            ..Default::default()
445        });
446        let res = op.writer_with("path").append(true).await;
447        assert!(res.is_ok());
448    }
449
450    #[tokio::test]
451    async fn test_delete() {
452        let op = new_test_operator(Capability {
453            delete: true,
454            ..Default::default()
455        });
456        let res = op.delete_with("path").version("version").await;
457        assert!(res.is_err());
458        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
459
460        let op = new_test_operator(Capability {
461            delete: true,
462            delete_with_version: true,
463            ..Default::default()
464        });
465        let res = op.delete_with("path").version("version").await;
466        assert!(res.is_ok())
467    }
468}