1use tokio::runtime::Handle;
19
20use crate::raw::*;
21use crate::*;
22
23#[derive(Debug, Clone)]
129pub struct BlockingLayer {
130 handle: Handle,
131}
132
133impl BlockingLayer {
134 pub fn create() -> Result<Self> {
136 Ok(Self {
137 handle: Handle::try_current()
138 .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?,
139 })
140 }
141}
142
143impl<A: Access> Layer<A> for BlockingLayer {
144 type LayeredAccess = BlockingAccessor<A>;
145
146 fn layer(&self, inner: A) -> Self::LayeredAccess {
147 let info = inner.info();
148 info.update_full_capability(|mut cap| {
149 cap.blocking = true;
150 cap
151 });
152
153 BlockingAccessor {
154 inner,
155 handle: self.handle.clone(),
156 }
157 }
158}
159
160#[derive(Clone, Debug)]
161pub struct BlockingAccessor<A: Access> {
162 inner: A,
163
164 handle: Handle,
165}
166
167impl<A: Access> LayeredAccess for BlockingAccessor<A> {
168 type Inner = A;
169 type Reader = A::Reader;
170 type BlockingReader = BlockingWrapper<A::Reader>;
171 type Writer = A::Writer;
172 type BlockingWriter = BlockingWrapper<A::Writer>;
173 type Lister = A::Lister;
174 type BlockingLister = BlockingWrapper<A::Lister>;
175 type Deleter = A::Deleter;
176 type BlockingDeleter = BlockingWrapper<A::Deleter>;
177
178 fn inner(&self) -> &Self::Inner {
179 &self.inner
180 }
181
182 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
183 self.inner.create_dir(path, args).await
184 }
185
186 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
187 self.inner.read(path, args).await
188 }
189
190 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
191 self.inner.write(path, args).await
192 }
193
194 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
195 self.inner.copy(from, to, args).await
196 }
197
198 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
199 self.inner.rename(from, to, args).await
200 }
201
202 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
203 self.inner.stat(path, args).await
204 }
205
206 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207 self.inner.delete().await
208 }
209
210 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
211 self.inner.list(path, args).await
212 }
213
214 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
215 self.inner.presign(path, args).await
216 }
217
218 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
219 self.handle.block_on(self.inner.create_dir(path, args))
220 }
221
222 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
223 self.handle.block_on(async {
224 let (rp, reader) = self.inner.read(path, args).await?;
225 let blocking_reader = Self::BlockingReader::new(self.handle.clone(), reader);
226
227 Ok((rp, blocking_reader))
228 })
229 }
230
231 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
232 self.handle.block_on(async {
233 let (rp, writer) = self.inner.write(path, args).await?;
234 let blocking_writer = Self::BlockingWriter::new(self.handle.clone(), writer);
235 Ok((rp, blocking_writer))
236 })
237 }
238
239 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
240 self.handle.block_on(self.inner.copy(from, to, args))
241 }
242
243 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
244 self.handle.block_on(self.inner.rename(from, to, args))
245 }
246
247 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
248 self.handle.block_on(self.inner.stat(path, args))
249 }
250
251 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
252 self.handle.block_on(async {
253 let (rp, writer) = self.inner.delete().await?;
254 let blocking_deleter = Self::BlockingDeleter::new(self.handle.clone(), writer);
255 Ok((rp, blocking_deleter))
256 })
257 }
258
259 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
260 self.handle.block_on(async {
261 let (rp, lister) = self.inner.list(path, args).await?;
262 let blocking_lister = Self::BlockingLister::new(self.handle.clone(), lister);
263 Ok((rp, blocking_lister))
264 })
265 }
266}
267
268pub struct BlockingWrapper<I> {
269 handle: Handle,
270 inner: Option<I>,
271}
272
273impl<I> BlockingWrapper<I> {
274 fn new(handle: Handle, inner: I) -> Self {
275 Self {
276 handle,
277 inner: Some(inner),
278 }
279 }
280}
281
282impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
283 fn read(&mut self) -> Result<Buffer> {
284 self.handle.block_on(self.inner.as_mut().unwrap().read())
285 }
286}
287
288impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
289 fn write(&mut self, bs: Buffer) -> Result<()> {
290 self.handle.block_on(self.inner.as_mut().unwrap().write(bs))
291 }
292
293 fn close(&mut self) -> Result<Metadata> {
294 self.handle.block_on(self.inner.as_mut().unwrap().close())
295 }
296}
297
298impl<I> Drop for BlockingWrapper<I> {
299 fn drop(&mut self) {
300 if let Some(inner) = self.inner.take() {
301 self.handle.block_on(async move {
302 drop(inner);
303 });
304 }
305 }
306}
307
308impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
309 fn next(&mut self) -> Result<Option<oio::Entry>> {
310 self.handle.block_on(self.inner.as_mut().unwrap().next())
311 }
312}
313
314impl<I: oio::Delete + 'static> oio::BlockingDelete for BlockingWrapper<I> {
315 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
316 self.inner.as_mut().unwrap().delete(path, args)
317 }
318
319 fn flush(&mut self) -> Result<usize> {
320 self.handle.block_on(self.inner.as_mut().unwrap().flush())
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::sync::LazyLock;
327
328 use super::*;
329 use crate::types::Result;
330
331 static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
332 tokio::runtime::Builder::new_multi_thread()
333 .enable_all()
334 .build()
335 .unwrap()
336 });
337
338 fn create_blocking_layer() -> Result<BlockingLayer> {
339 let _guard = RUNTIME.enter();
340 BlockingLayer::create()
341 }
342
343 #[test]
344 fn test_blocking_layer_in_blocking_context() {
345 let layer = BlockingLayer::create();
347 assert!(layer.is_err());
348
349 let layer = create_blocking_layer();
351 assert!(layer.is_ok())
352 }
353
354 #[test]
355 fn test_blocking_layer_in_async_context() {
356 let _guard = RUNTIME.enter();
358
359 let layer = BlockingLayer::create();
360 assert!(layer.is_ok());
361 }
362}