opendal/raw/
layer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use futures::Future;
22
23use crate::raw::*;
24use crate::*;
25
26/// Layer is used to intercept the operations on the underlying storage.
27///
28/// Struct that implement this trait must accept input `Arc<dyn Accessor>` as inner,
29/// and returns a new `Arc<dyn Accessor>` as output.
30///
31/// All functions in `Accessor` requires `&self`, so it's implementer's responsibility
32/// to maintain the internal mutability. Please also keep in mind that `Accessor`
33/// requires `Send` and `Sync`.
34///
35/// # Notes
36///
37/// ## Inner
38///
39/// It's required to implement `fn inner() -> Option<Arc<dyn Accessor>>` for layer's accessors.
40///
41/// By implement this method, all API calls will be forwarded to inner accessor instead.
42///
43/// # Examples
44///
45/// ```
46/// use std::sync::Arc;
47///
48/// use opendal::raw::*;
49/// use opendal::*;
50///
51/// /// Implement the real accessor logic here.
52/// #[derive(Debug)]
53/// struct TraceAccessor<A: Access> {
54///     inner: A,
55/// }
56///
57/// impl<A: Access> LayeredAccess for TraceAccessor<A> {
58///     type Inner = A;
59///     type Reader = A::Reader;
60///     type BlockingReader = A::BlockingReader;
61///     type Writer = A::Writer;
62///     type BlockingWriter = A::BlockingWriter;
63///     type Lister = A::Lister;
64///     type BlockingLister = A::BlockingLister;
65///     type Deleter = A::Deleter;
66///     type BlockingDeleter = A::BlockingDeleter;
67///
68///     fn inner(&self) -> &Self::Inner {
69///         &self.inner
70///     }
71///
72///     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
73///         self.inner.read(path, args).await
74///     }
75///
76///     fn blocking_read(
77///         &self,
78///         path: &str,
79///         args: OpRead,
80///     ) -> Result<(RpRead, Self::BlockingReader)> {
81///         self.inner.blocking_read(path, args)
82///     }
83///
84///     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
85///         self.inner.write(path, args).await
86///     }
87///
88///     fn blocking_write(
89///         &self,
90///         path: &str,
91///         args: OpWrite,
92///     ) -> Result<(RpWrite, Self::BlockingWriter)> {
93///         self.inner.blocking_write(path, args)
94///     }
95///
96///     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
97///         self.inner.list(path, args).await
98///     }
99///
100///     fn blocking_list(
101///         &self,
102///         path: &str,
103///         args: OpList,
104///     ) -> Result<(RpList, Self::BlockingLister)> {
105///         self.inner.blocking_list(path, args)
106///     }
107///
108///     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
109///        self.inner.delete().await
110///        }
111///
112///     fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
113///        self.inner.blocking_delete()
114///    }
115/// }
116///
117/// /// The public struct that exposed to users.
118/// ///
119/// /// Will be used like `op.layer(TraceLayer)`
120/// struct TraceLayer;
121///
122/// impl<A: Access> Layer<A> for TraceLayer {
123///     type LayeredAccess = TraceAccessor<A>;
124///
125///     fn layer(&self, inner: A) -> Self::LayeredAccess {
126///         TraceAccessor { inner }
127///     }
128/// }
129/// ```
130pub trait Layer<A: Access> {
131    /// The layered accessor that returned by this layer.
132    type LayeredAccess: Access;
133
134    /// Intercept the operations on the underlying storage.
135    fn layer(&self, inner: A) -> Self::LayeredAccess;
136}
137
138/// LayeredAccess is layered accessor that forward all not implemented
139/// method to inner.
140#[allow(missing_docs)]
141pub trait LayeredAccess: Send + Sync + Debug + Unpin + 'static {
142    type Inner: Access;
143
144    type Reader: oio::Read;
145    type Writer: oio::Write;
146    type Lister: oio::List;
147    type Deleter: oio::Delete;
148    type BlockingReader: oio::BlockingRead;
149    type BlockingWriter: oio::BlockingWrite;
150    type BlockingLister: oio::BlockingList;
151    type BlockingDeleter: oio::BlockingDelete;
152
153    fn inner(&self) -> &Self::Inner;
154
155    fn info(&self) -> Arc<AccessorInfo> {
156        self.inner().info()
157    }
158
159    fn create_dir(
160        &self,
161        path: &str,
162        args: OpCreateDir,
163    ) -> impl Future<Output = Result<RpCreateDir>> + MaybeSend {
164        self.inner().create_dir(path, args)
165    }
166
167    fn read(
168        &self,
169        path: &str,
170        args: OpRead,
171    ) -> impl Future<Output = Result<(RpRead, Self::Reader)>> + MaybeSend;
172
173    fn write(
174        &self,
175        path: &str,
176        args: OpWrite,
177    ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend;
178
179    fn copy(
180        &self,
181        from: &str,
182        to: &str,
183        args: OpCopy,
184    ) -> impl Future<Output = Result<RpCopy>> + MaybeSend {
185        self.inner().copy(from, to, args)
186    }
187
188    fn rename(
189        &self,
190        from: &str,
191        to: &str,
192        args: OpRename,
193    ) -> impl Future<Output = Result<RpRename>> + MaybeSend {
194        self.inner().rename(from, to, args)
195    }
196
197    fn stat(&self, path: &str, args: OpStat) -> impl Future<Output = Result<RpStat>> + MaybeSend {
198        self.inner().stat(path, args)
199    }
200
201    fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend;
202
203    fn list(
204        &self,
205        path: &str,
206        args: OpList,
207    ) -> impl Future<Output = Result<(RpList, Self::Lister)>> + MaybeSend;
208
209    fn presign(
210        &self,
211        path: &str,
212        args: OpPresign,
213    ) -> impl Future<Output = Result<RpPresign>> + MaybeSend {
214        self.inner().presign(path, args)
215    }
216
217    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
218        self.inner().blocking_create_dir(path, args)
219    }
220
221    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)>;
222
223    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)>;
224
225    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
226        self.inner().blocking_copy(from, to, args)
227    }
228
229    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
230        self.inner().blocking_rename(from, to, args)
231    }
232
233    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
234        self.inner().blocking_stat(path, args)
235    }
236
237    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)>;
238
239    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)>;
240}
241
242impl<L: LayeredAccess> Access for L {
243    type Reader = L::Reader;
244    type Writer = L::Writer;
245    type Lister = L::Lister;
246    type Deleter = L::Deleter;
247
248    type BlockingReader = L::BlockingReader;
249    type BlockingWriter = L::BlockingWriter;
250    type BlockingLister = L::BlockingLister;
251    type BlockingDeleter = L::BlockingDeleter;
252
253    fn info(&self) -> Arc<AccessorInfo> {
254        LayeredAccess::info(self)
255    }
256
257    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
258        LayeredAccess::create_dir(self, path, args).await
259    }
260
261    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
262        LayeredAccess::read(self, path, args).await
263    }
264
265    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
266        LayeredAccess::write(self, path, args).await
267    }
268
269    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
270        LayeredAccess::copy(self, from, to, args).await
271    }
272
273    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
274        LayeredAccess::rename(self, from, to, args).await
275    }
276
277    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
278        LayeredAccess::stat(self, path, args).await
279    }
280
281    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
282        LayeredAccess::delete(self).await
283    }
284
285    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
286        LayeredAccess::list(self, path, args).await
287    }
288
289    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
290        LayeredAccess::presign(self, path, args).await
291    }
292
293    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
294        LayeredAccess::blocking_create_dir(self, path, args)
295    }
296
297    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
298        LayeredAccess::blocking_read(self, path, args)
299    }
300
301    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
302        LayeredAccess::blocking_write(self, path, args)
303    }
304
305    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
306        LayeredAccess::blocking_copy(self, from, to, args)
307    }
308
309    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
310        LayeredAccess::blocking_rename(self, from, to, args)
311    }
312
313    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
314        LayeredAccess::blocking_stat(self, path, args)
315    }
316
317    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
318        LayeredAccess::blocking_delete(self)
319    }
320
321    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
322        LayeredAccess::blocking_list(self, path, args)
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::sync::Arc;
329
330    use futures::lock::Mutex;
331
332    use super::*;
333    use crate::services::Memory;
334
335    #[derive(Debug)]
336    struct Test<A: Access> {
337        #[allow(dead_code)]
338        inner: Option<A>,
339        stated: Arc<Mutex<bool>>,
340    }
341
342    impl<A: Access> Layer<A> for &Test<A> {
343        type LayeredAccess = Test<A>;
344
345        fn layer(&self, inner: A) -> Self::LayeredAccess {
346            Test {
347                inner: Some(inner),
348                stated: self.stated.clone(),
349            }
350        }
351    }
352
353    impl<A: Access> Access for Test<A> {
354        type Reader = ();
355        type BlockingReader = ();
356        type Writer = ();
357        type BlockingWriter = ();
358        type Lister = ();
359        type BlockingLister = ();
360        type Deleter = ();
361        type BlockingDeleter = ();
362
363        fn info(&self) -> Arc<AccessorInfo> {
364            let am = AccessorInfo::default();
365            am.set_scheme(Scheme::Custom("test"));
366            am.into()
367        }
368
369        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
370            let mut x = self.stated.lock().await;
371            *x = true;
372
373            assert!(self.inner.is_some());
374
375            // We will not call anything here to test the layer.
376            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
377        }
378    }
379
380    #[tokio::test]
381    async fn test_layer() {
382        let test = Test {
383            inner: None,
384            stated: Arc::new(Mutex::new(false)),
385        };
386
387        let op = Operator::new(Memory::default())
388            .unwrap()
389            .layer(&test)
390            .finish();
391
392        op.stat("xxxxx").await.unwrap();
393
394        assert!(*test.stated.clone().lock().await);
395    }
396}