opendal/layers/
capability_check.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::layers::correctness_check::new_unsupported_error;
19use crate::raw::*;
20use std::fmt::{Debug, Formatter};
21use std::sync::Arc;
22
23/// Add an extra capability check layer for every operation
24///
25/// Similar to `CorrectnessChecker`, Before performing any operations, this layer will first verify
26/// its arguments against the capability of the underlying service. If the arguments is not supported,
27/// an error will be returned directly.
28///
29/// Notes
30///
31/// There are two main differences between this checker with the `CorrectnessChecker`:
32/// 1. This checker provides additional checks for capabilities like write_with_content_type and
33///    list_with_versions, among others. These capabilities do not affect data integrity, even if
34///    the underlying storage services do not support them.
35///
36/// 2. OpenDAL doesn't apply this checker by default. Users can enable this layer if they want to
37///    enforce stricter requirements.
38///
39/// # examples
40///
41/// ```no_run
42/// # use opendal::layers::CapabilityCheckLayer;
43/// # use opendal::services;
44/// # use opendal::Operator;
45/// # use opendal::Result;
46/// # use opendal::Scheme;
47///
48/// # fn main() -> Result<()> {
49/// use opendal::layers::CapabilityCheckLayer;
50/// let _ = Operator::new(services::Memory::default())?
51///     .layer(CapabilityCheckLayer)
52///     .finish();
53/// Ok(())
54/// # }
55/// ```
56#[derive(Default)]
57pub struct CapabilityCheckLayer;
58
59impl<A: Access> Layer<A> for CapabilityCheckLayer {
60    type LayeredAccess = CapabilityAccessor<A>;
61
62    fn layer(&self, inner: A) -> Self::LayeredAccess {
63        let info = inner.info();
64
65        CapabilityAccessor { info, inner }
66    }
67}
68pub struct CapabilityAccessor<A: Access> {
69    info: Arc<AccessorInfo>,
70    inner: A,
71}
72
73impl<A: Access> Debug for CapabilityAccessor<A> {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("CapabilityCheckAccessor")
76            .field("inner", &self.inner)
77            .finish_non_exhaustive()
78    }
79}
80
81impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
82    type Inner = A;
83    type Reader = A::Reader;
84    type Writer = A::Writer;
85    type Lister = A::Lister;
86    type Deleter = A::Deleter;
87    type BlockingReader = A::BlockingReader;
88    type BlockingWriter = A::BlockingWriter;
89    type BlockingLister = A::BlockingLister;
90    type BlockingDeleter = A::BlockingDeleter;
91
92    fn inner(&self) -> &Self::Inner {
93        &self.inner
94    }
95
96    async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> {
97        self.inner.read(path, args).await
98    }
99
100    async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> {
101        let capability = self.info.full_capability();
102        if !capability.write_with_content_type && args.content_type().is_some() {
103            return Err(new_unsupported_error(
104                self.info.as_ref(),
105                Operation::Write,
106                "content_type",
107            ));
108        }
109        if !capability.write_with_cache_control && args.cache_control().is_some() {
110            return Err(new_unsupported_error(
111                self.info.as_ref(),
112                Operation::Write,
113                "cache_control",
114            ));
115        }
116        if !capability.write_with_content_disposition && args.content_disposition().is_some() {
117            return Err(new_unsupported_error(
118                self.info.as_ref(),
119                Operation::Write,
120                "content_disposition",
121            ));
122        }
123
124        self.inner.write(path, args).await
125    }
126
127    async fn delete(&self) -> crate::Result<(RpDelete, Self::Deleter)> {
128        self.inner.delete().await
129    }
130
131    async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> {
132        let capability = self.info.full_capability();
133        if !capability.list_with_versions && args.versions() {
134            return Err(new_unsupported_error(
135                self.info.as_ref(),
136                Operation::List,
137                "version",
138            ));
139        }
140
141        self.inner.list(path, args).await
142    }
143
144    fn blocking_read(
145        &self,
146        path: &str,
147        args: OpRead,
148    ) -> crate::Result<(RpRead, Self::BlockingReader)> {
149        self.inner().blocking_read(path, args)
150    }
151
152    fn blocking_write(
153        &self,
154        path: &str,
155        args: OpWrite,
156    ) -> crate::Result<(RpWrite, Self::BlockingWriter)> {
157        let capability = self.info.full_capability();
158        if !capability.write_with_content_type && args.content_type().is_some() {
159            return Err(new_unsupported_error(
160                self.info.as_ref(),
161                Operation::Write,
162                "content_type",
163            ));
164        }
165        if !capability.write_with_cache_control && args.cache_control().is_some() {
166            return Err(new_unsupported_error(
167                self.info.as_ref(),
168                Operation::Write,
169                "cache_control",
170            ));
171        }
172        if !capability.write_with_content_disposition && args.content_disposition().is_some() {
173            return Err(new_unsupported_error(
174                self.info.as_ref(),
175                Operation::Write,
176                "content_disposition",
177            ));
178        }
179
180        self.inner.blocking_write(path, args)
181    }
182
183    fn blocking_delete(&self) -> crate::Result<(RpDelete, Self::BlockingDeleter)> {
184        self.inner.blocking_delete()
185    }
186
187    fn blocking_list(
188        &self,
189        path: &str,
190        args: OpList,
191    ) -> crate::Result<(RpList, Self::BlockingLister)> {
192        let capability = self.info.full_capability();
193        if !capability.list_with_versions && args.versions() {
194            return Err(new_unsupported_error(
195                self.info.as_ref(),
196                Operation::List,
197                "version",
198            ));
199        }
200
201        self.inner.blocking_list(path, args)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::{Capability, ErrorKind, Operator};
209
210    #[derive(Debug)]
211    struct MockService {
212        capability: Capability,
213    }
214
215    impl Access for MockService {
216        type Reader = oio::Reader;
217        type Writer = oio::Writer;
218        type Lister = oio::Lister;
219        type Deleter = oio::Deleter;
220        type BlockingReader = oio::BlockingReader;
221        type BlockingWriter = oio::BlockingWriter;
222        type BlockingLister = oio::BlockingLister;
223        type BlockingDeleter = oio::BlockingDeleter;
224
225        fn info(&self) -> Arc<AccessorInfo> {
226            let info = AccessorInfo::default();
227            info.set_native_capability(self.capability);
228
229            info.into()
230        }
231
232        async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> {
233            Ok((RpWrite::new(), Box::new(())))
234        }
235
236        async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> {
237            Ok((RpList {}, Box::new(())))
238        }
239    }
240
241    fn new_test_operator(capability: Capability) -> Operator {
242        let srv = MockService { capability };
243
244        Operator::from_inner(Arc::new(srv)).layer(CapabilityCheckLayer)
245    }
246
247    #[tokio::test]
248    async fn test_writer_with() {
249        let op = new_test_operator(Capability {
250            write: true,
251            ..Default::default()
252        });
253        let res = op.writer_with("path").content_type("type").await;
254        assert!(res.is_err());
255
256        let res = op.writer_with("path").cache_control("cache").await;
257        assert!(res.is_err());
258
259        let res = op
260            .writer_with("path")
261            .content_disposition("disposition")
262            .await;
263        assert!(res.is_err());
264
265        let op = new_test_operator(Capability {
266            write: true,
267            write_with_content_type: true,
268            write_with_cache_control: true,
269            write_with_content_disposition: true,
270            ..Default::default()
271        });
272        let res = op.writer_with("path").content_type("type").await;
273        assert!(res.is_ok());
274
275        let res = op.writer_with("path").cache_control("cache").await;
276        assert!(res.is_ok());
277
278        let res = op
279            .writer_with("path")
280            .content_disposition("disposition")
281            .await;
282        assert!(res.is_ok());
283    }
284
285    #[tokio::test]
286    async fn test_list_with() {
287        let op = new_test_operator(Capability {
288            list: true,
289            ..Default::default()
290        });
291        let res = op.list_with("path/").versions(true).await;
292        assert!(res.is_err());
293        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
294
295        let op = new_test_operator(Capability {
296            list: true,
297            list_with_versions: true,
298            ..Default::default()
299        });
300        let res = op.lister_with("path/").versions(true).await;
301        assert!(res.is_ok())
302    }
303}