opendal_core/raw/
layer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use futures::Future;
22use futures::future::ready;
23
24use crate::raw::*;
25use crate::*;
26
27/// Layer is used to intercept the operations on the underlying storage.
28///
29/// Struct that implement this trait must accept input `Arc<dyn Accessor>` as inner,
30/// and returns a new `Arc<dyn Accessor>` as output.
31///
32/// All functions in `Accessor` requires `&self`, so it's implementer's responsibility
33/// to maintain the internal mutability. Please also keep in mind that `Accessor`
34/// requires `Send` and `Sync`.
35///
36/// # Notes
37///
38/// ## Inner
39///
40/// It's required to implement `fn inner() -> Option<Arc<dyn Accessor>>` for layer's accessors.
41///
42/// By implement this method, all API calls will be forwarded to inner accessor instead.
43///
44/// # Examples
45///
46/// ```
47/// use std::sync::Arc;
48///
49/// use opendal_core::raw::*;
50/// use opendal_core::*;
51///
52/// /// Implement the real accessor logic here.
53/// #[derive(Debug)]
54/// struct TraceAccessor<A: Access> {
55///     inner: A,
56/// }
57///
58/// impl<A: Access> LayeredAccess for TraceAccessor<A> {
59///     type Inner = A;
60///     type Reader = A::Reader;
61///     type Writer = A::Writer;
62///     type Lister = A::Lister;
63///     type Deleter = A::Deleter;
64///     type Copier = A::Copier;
65///
66///     fn inner(&self) -> &Self::Inner {
67///         &self.inner
68///     }
69///
70///     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
71///         self.inner.read(path, args).await
72///     }
73///
74///     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
75///         self.inner.write(path, args).await
76///     }
77///
78///     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
79///         self.inner.list(path, args).await
80///     }
81///
82///     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
83///         self.inner.delete().await
84///     }
85/// }
86///
87/// /// The public struct that exposed to users.
88/// ///
89/// /// Will be used like `op.layer(TraceLayer)`
90/// struct TraceLayer;
91///
92/// impl<A: Access> Layer<A> for TraceLayer {
93///     type LayeredAccess = TraceAccessor<A>;
94///
95///     fn layer(&self, inner: A) -> Self::LayeredAccess {
96///         TraceAccessor { inner }
97///     }
98/// }
99/// ```
100pub trait Layer<A: Access> {
101    /// The layered accessor that returned by this layer.
102    type LayeredAccess: Access;
103
104    /// Intercept the operations on the underlying storage.
105    fn layer(&self, inner: A) -> Self::LayeredAccess;
106}
107
108/// LayeredAccess is layered accessor that forward all not implemented
109/// method to inner.
110#[allow(missing_docs)]
111pub trait LayeredAccess: Send + Sync + Debug + Unpin + 'static {
112    type Inner: Access;
113
114    type Reader: oio::Read;
115    type Writer: oio::Write;
116    type Lister: oio::List;
117    type Deleter: oio::Delete;
118    type Copier: oio::Copy;
119
120    fn inner(&self) -> &Self::Inner;
121
122    fn info(&self) -> Arc<AccessorInfo> {
123        self.inner().info()
124    }
125
126    fn create_dir(
127        &self,
128        path: &str,
129        args: OpCreateDir,
130    ) -> impl Future<Output = Result<RpCreateDir>> + MaybeSend {
131        self.inner().create_dir(path, args)
132    }
133
134    fn read(
135        &self,
136        path: &str,
137        args: OpRead,
138    ) -> impl Future<Output = Result<(RpRead, Self::Reader)>> + MaybeSend;
139
140    fn write(
141        &self,
142        path: &str,
143        args: OpWrite,
144    ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend;
145
146    fn copy(
147        &self,
148        from: &str,
149        to: &str,
150        args: OpCopy,
151        opts: OpCopier,
152    ) -> impl Future<Output = Result<(RpCopy, Self::Copier)>> + MaybeSend {
153        let (_, _, _, _) = (from, to, args, opts);
154
155        ready(Err(Error::new(
156            ErrorKind::Unsupported,
157            "operation is not supported",
158        )))
159    }
160
161    fn rename(
162        &self,
163        from: &str,
164        to: &str,
165        args: OpRename,
166    ) -> impl Future<Output = Result<RpRename>> + MaybeSend {
167        self.inner().rename(from, to, args)
168    }
169
170    fn stat(&self, path: &str, args: OpStat) -> impl Future<Output = Result<RpStat>> + MaybeSend {
171        self.inner().stat(path, args)
172    }
173
174    fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend;
175
176    fn list(
177        &self,
178        path: &str,
179        args: OpList,
180    ) -> impl Future<Output = Result<(RpList, Self::Lister)>> + MaybeSend;
181
182    fn presign(
183        &self,
184        path: &str,
185        args: OpPresign,
186    ) -> impl Future<Output = Result<RpPresign>> + MaybeSend {
187        self.inner().presign(path, args)
188    }
189}
190
191impl<L: LayeredAccess> Access for L {
192    type Reader = L::Reader;
193    type Writer = L::Writer;
194    type Lister = L::Lister;
195    type Deleter = L::Deleter;
196    type Copier = L::Copier;
197
198    fn info(&self) -> Arc<AccessorInfo> {
199        LayeredAccess::info(self)
200    }
201
202    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
203        LayeredAccess::create_dir(self, path, args).await
204    }
205
206    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
207        LayeredAccess::read(self, path, args).await
208    }
209
210    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
211        LayeredAccess::write(self, path, args).await
212    }
213
214    async fn copy(
215        &self,
216        from: &str,
217        to: &str,
218        args: OpCopy,
219        opts: OpCopier,
220    ) -> Result<(RpCopy, Self::Copier)> {
221        LayeredAccess::copy(self, from, to, args, opts).await
222    }
223
224    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
225        LayeredAccess::rename(self, from, to, args).await
226    }
227
228    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
229        LayeredAccess::stat(self, path, args).await
230    }
231
232    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
233        LayeredAccess::delete(self).await
234    }
235
236    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
237        LayeredAccess::list(self, path, args).await
238    }
239
240    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
241        LayeredAccess::presign(self, path, args).await
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use std::sync::Arc;
248
249    use futures::lock::Mutex;
250
251    use super::*;
252    use crate::services::Memory;
253
254    #[derive(Debug)]
255    struct Test<A: Access> {
256        #[allow(dead_code)]
257        inner: Option<A>,
258        stated: Arc<Mutex<bool>>,
259    }
260
261    impl<A: Access> Layer<A> for &Test<A> {
262        type LayeredAccess = Test<A>;
263
264        fn layer(&self, inner: A) -> Self::LayeredAccess {
265            Test {
266                inner: Some(inner),
267                stated: self.stated.clone(),
268            }
269        }
270    }
271
272    impl<A: Access> Access for Test<A> {
273        type Reader = ();
274        type Writer = ();
275        type Lister = ();
276        type Deleter = ();
277        type Copier = ();
278
279        fn info(&self) -> Arc<AccessorInfo> {
280            let am = AccessorInfo::default();
281            am.set_scheme("test");
282            am.into()
283        }
284
285        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
286            let mut x = self.stated.lock().await;
287            *x = true;
288
289            assert!(self.inner.is_some());
290
291            // We will not call anything here to test the layer.
292            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
293        }
294    }
295
296    #[tokio::test]
297    async fn test_layer() {
298        let test = Test {
299            inner: None,
300            stated: Arc::new(Mutex::new(false)),
301        };
302
303        let op = Operator::new(Memory::default())
304            .unwrap()
305            .layer(&test)
306            .finish();
307
308        op.stat("xxxxx").await.unwrap();
309
310        assert!(*test.stated.clone().lock().await);
311    }
312}