opendal/layers/
blocking.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::runtime::Handle;
19
20use crate::raw::*;
21use crate::*;
22
23/// Add blocking API support for non-blocking services.
24///
25/// # Notes
26///
27/// - Please only enable this layer when the underlying service does not support blocking.
28///
29/// # Examples
30///
31/// ## In async context
32///
33/// BlockingLayer will use current async context's runtime to handle the async calls.
34///
35/// ```rust,no_run
36/// # use opendal::layers::BlockingLayer;
37/// # use opendal::services;
38/// # use opendal::BlockingOperator;
39/// # use opendal::Operator;
40/// # use opendal::Result;
41///
42/// #[tokio::main]
43/// async fn main() -> Result<()> {
44///     // Create fs backend builder.
45///     let mut builder = services::S3::default().bucket("test").region("us-east-1");
46///
47///     // Build an `BlockingOperator` with blocking layer to start operating the storage.
48///     let _: BlockingOperator = Operator::new(builder)?
49///         .layer(BlockingLayer::create()?)
50///         .finish()
51///         .blocking();
52///
53///     Ok(())
54/// }
55/// ```
56///
57/// ## In async context with blocking functions
58///
59/// If `BlockingLayer` is called in blocking function, please fetch a [`tokio::runtime::EnterGuard`]
60/// first. You can use [`Handle::try_current`] first to get the handle and then call [`Handle::enter`].
61/// This often happens in the case that async function calls blocking function.
62///
63/// ```rust,no_run
64/// # use opendal::layers::BlockingLayer;
65/// # use opendal::services;
66/// # use opendal::BlockingOperator;
67/// # use opendal::Operator;
68/// # use opendal::Result;
69///
70/// #[tokio::main]
71/// async fn main() -> Result<()> {
72///     let _ = blocking_fn()?;
73///     Ok(())
74/// }
75///
76/// fn blocking_fn() -> Result<BlockingOperator> {
77///     // Create fs backend builder.
78///     let mut builder = services::S3::default().bucket("test").region("us-east-1");
79///
80///     let handle = tokio::runtime::Handle::try_current().unwrap();
81///     let _guard = handle.enter();
82///     // Build an `BlockingOperator` with blocking layer to start operating the storage.
83///     let op: BlockingOperator = Operator::new(builder)?
84///         .layer(BlockingLayer::create()?)
85///         .finish()
86///         .blocking();
87///     Ok(op)
88/// }
89/// ```
90///
91/// ## In blocking context
92///
93/// In a pure blocking context, we can create a runtime and use it to create the `BlockingLayer`.
94///
95/// > The following code uses a global statically created runtime as an example, please manage the
96/// > runtime on demand.
97///
98/// ```rust,no_run
99/// # use std::sync::LazyLock;
100/// # use opendal::layers::BlockingLayer;
101/// # use opendal::services;
102/// # use opendal::BlockingOperator;
103/// # use opendal::Operator;
104/// # use opendal::Result;
105///
106/// static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
107///     tokio::runtime::Builder::new_multi_thread()
108///         .enable_all()
109///         .build()
110///         .unwrap()
111/// });
112///
113/// fn main() -> Result<()> {
114///     // Create fs backend builder.
115///     let mut builder = services::S3::default().bucket("test").region("us-east-1");
116///
117///     // Fetch the `EnterGuard` from global runtime.
118///     let _guard = RUNTIME.enter();
119///     // Build an `BlockingOperator` with blocking layer to start operating the storage.
120///     let _: BlockingOperator = Operator::new(builder)?
121///         .layer(BlockingLayer::create()?)
122///         .finish()
123///         .blocking();
124///
125///     Ok(())
126/// }
127/// ```
128#[derive(Debug, Clone)]
129pub struct BlockingLayer {
130    handle: Handle,
131}
132
133impl BlockingLayer {
134    /// Create a new `BlockingLayer` with the current runtime's handle
135    pub fn create() -> Result<Self> {
136        Ok(Self {
137            handle: Handle::try_current()
138                .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?,
139        })
140    }
141}
142
143impl<A: Access> Layer<A> for BlockingLayer {
144    type LayeredAccess = BlockingAccessor<A>;
145
146    fn layer(&self, inner: A) -> Self::LayeredAccess {
147        let info = inner.info();
148        info.update_full_capability(|mut cap| {
149            cap.blocking = true;
150            cap
151        });
152
153        BlockingAccessor {
154            inner,
155            handle: self.handle.clone(),
156        }
157    }
158}
159
160#[derive(Clone, Debug)]
161pub struct BlockingAccessor<A: Access> {
162    inner: A,
163
164    handle: Handle,
165}
166
167impl<A: Access> LayeredAccess for BlockingAccessor<A> {
168    type Inner = A;
169    type Reader = A::Reader;
170    type BlockingReader = BlockingWrapper<A::Reader>;
171    type Writer = A::Writer;
172    type BlockingWriter = BlockingWrapper<A::Writer>;
173    type Lister = A::Lister;
174    type BlockingLister = BlockingWrapper<A::Lister>;
175    type Deleter = A::Deleter;
176    type BlockingDeleter = BlockingWrapper<A::Deleter>;
177
178    fn inner(&self) -> &Self::Inner {
179        &self.inner
180    }
181
182    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
183        self.inner.create_dir(path, args).await
184    }
185
186    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
187        self.inner.read(path, args).await
188    }
189
190    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
191        self.inner.write(path, args).await
192    }
193
194    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
195        self.inner.copy(from, to, args).await
196    }
197
198    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
199        self.inner.rename(from, to, args).await
200    }
201
202    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
203        self.inner.stat(path, args).await
204    }
205
206    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207        self.inner.delete().await
208    }
209
210    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
211        self.inner.list(path, args).await
212    }
213
214    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
215        self.inner.presign(path, args).await
216    }
217
218    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
219        self.handle.block_on(self.inner.create_dir(path, args))
220    }
221
222    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
223        self.handle.block_on(async {
224            let (rp, reader) = self.inner.read(path, args).await?;
225            let blocking_reader = Self::BlockingReader::new(self.handle.clone(), reader);
226
227            Ok((rp, blocking_reader))
228        })
229    }
230
231    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
232        self.handle.block_on(async {
233            let (rp, writer) = self.inner.write(path, args).await?;
234            let blocking_writer = Self::BlockingWriter::new(self.handle.clone(), writer);
235            Ok((rp, blocking_writer))
236        })
237    }
238
239    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
240        self.handle.block_on(self.inner.copy(from, to, args))
241    }
242
243    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
244        self.handle.block_on(self.inner.rename(from, to, args))
245    }
246
247    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
248        self.handle.block_on(self.inner.stat(path, args))
249    }
250
251    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
252        self.handle.block_on(async {
253            let (rp, writer) = self.inner.delete().await?;
254            let blocking_deleter = Self::BlockingDeleter::new(self.handle.clone(), writer);
255            Ok((rp, blocking_deleter))
256        })
257    }
258
259    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
260        self.handle.block_on(async {
261            let (rp, lister) = self.inner.list(path, args).await?;
262            let blocking_lister = Self::BlockingLister::new(self.handle.clone(), lister);
263            Ok((rp, blocking_lister))
264        })
265    }
266}
267
268pub struct BlockingWrapper<I> {
269    handle: Handle,
270    inner: I,
271}
272
273impl<I> BlockingWrapper<I> {
274    fn new(handle: Handle, inner: I) -> Self {
275        Self { handle, inner }
276    }
277}
278
279impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
280    fn read(&mut self) -> Result<Buffer> {
281        self.handle.block_on(self.inner.read())
282    }
283}
284
285impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
286    fn write(&mut self, bs: Buffer) -> Result<()> {
287        self.handle.block_on(self.inner.write(bs))
288    }
289
290    fn close(&mut self) -> Result<Metadata> {
291        self.handle.block_on(self.inner.close())
292    }
293}
294
295impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
296    fn next(&mut self) -> Result<Option<oio::Entry>> {
297        self.handle.block_on(self.inner.next())
298    }
299}
300
301impl<I: oio::Delete + 'static> oio::BlockingDelete for BlockingWrapper<I> {
302    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
303        self.inner.delete(path, args)
304    }
305
306    fn flush(&mut self) -> Result<usize> {
307        self.handle.block_on(self.inner.flush())
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use std::sync::LazyLock;
314
315    use super::*;
316    use crate::types::Result;
317
318    static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
319        tokio::runtime::Builder::new_multi_thread()
320            .enable_all()
321            .build()
322            .unwrap()
323    });
324
325    fn create_blocking_layer() -> Result<BlockingLayer> {
326        let _guard = RUNTIME.enter();
327        BlockingLayer::create()
328    }
329
330    #[test]
331    fn test_blocking_layer_in_blocking_context() {
332        // create in a blocking context should fail
333        let layer = BlockingLayer::create();
334        assert!(layer.is_err());
335
336        // create in an async context and drop in a blocking context
337        let layer = create_blocking_layer();
338        assert!(layer.is_ok())
339    }
340
341    #[test]
342    fn test_blocking_layer_in_async_context() {
343        // create and drop in an async context
344        let _guard = RUNTIME.enter();
345
346        let layer = BlockingLayer::create();
347        assert!(layer.is_ok());
348    }
349}