1use tokio::runtime::Handle;
19
20use crate::raw::*;
21use crate::*;
22
23#[derive(Debug, Clone)]
129pub struct BlockingLayer {
130 handle: Handle,
131}
132
133impl BlockingLayer {
134 pub fn create() -> Result<Self> {
136 Ok(Self {
137 handle: Handle::try_current()
138 .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?,
139 })
140 }
141}
142
143impl<A: Access> Layer<A> for BlockingLayer {
144 type LayeredAccess = BlockingAccessor<A>;
145
146 fn layer(&self, inner: A) -> Self::LayeredAccess {
147 let info = inner.info();
148 info.update_full_capability(|mut cap| {
149 cap.blocking = true;
150 cap
151 });
152
153 BlockingAccessor {
154 inner,
155 handle: self.handle.clone(),
156 }
157 }
158}
159
160#[derive(Clone, Debug)]
161pub struct BlockingAccessor<A: Access> {
162 inner: A,
163
164 handle: Handle,
165}
166
167impl<A: Access> LayeredAccess for BlockingAccessor<A> {
168 type Inner = A;
169 type Reader = A::Reader;
170 type BlockingReader = BlockingWrapper<A::Reader>;
171 type Writer = A::Writer;
172 type BlockingWriter = BlockingWrapper<A::Writer>;
173 type Lister = A::Lister;
174 type BlockingLister = BlockingWrapper<A::Lister>;
175 type Deleter = A::Deleter;
176 type BlockingDeleter = BlockingWrapper<A::Deleter>;
177
178 fn inner(&self) -> &Self::Inner {
179 &self.inner
180 }
181
182 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
183 self.inner.create_dir(path, args).await
184 }
185
186 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
187 self.inner.read(path, args).await
188 }
189
190 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
191 self.inner.write(path, args).await
192 }
193
194 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
195 self.inner.copy(from, to, args).await
196 }
197
198 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
199 self.inner.rename(from, to, args).await
200 }
201
202 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
203 self.inner.stat(path, args).await
204 }
205
206 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207 self.inner.delete().await
208 }
209
210 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
211 self.inner.list(path, args).await
212 }
213
214 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
215 self.inner.presign(path, args).await
216 }
217
218 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
219 self.handle.block_on(self.inner.create_dir(path, args))
220 }
221
222 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
223 self.handle.block_on(async {
224 let (rp, reader) = self.inner.read(path, args).await?;
225 let blocking_reader = Self::BlockingReader::new(self.handle.clone(), reader);
226
227 Ok((rp, blocking_reader))
228 })
229 }
230
231 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
232 self.handle.block_on(async {
233 let (rp, writer) = self.inner.write(path, args).await?;
234 let blocking_writer = Self::BlockingWriter::new(self.handle.clone(), writer);
235 Ok((rp, blocking_writer))
236 })
237 }
238
239 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
240 self.handle.block_on(self.inner.copy(from, to, args))
241 }
242
243 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
244 self.handle.block_on(self.inner.rename(from, to, args))
245 }
246
247 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
248 self.handle.block_on(self.inner.stat(path, args))
249 }
250
251 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
252 self.handle.block_on(async {
253 let (rp, writer) = self.inner.delete().await?;
254 let blocking_deleter = Self::BlockingDeleter::new(self.handle.clone(), writer);
255 Ok((rp, blocking_deleter))
256 })
257 }
258
259 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
260 self.handle.block_on(async {
261 let (rp, lister) = self.inner.list(path, args).await?;
262 let blocking_lister = Self::BlockingLister::new(self.handle.clone(), lister);
263 Ok((rp, blocking_lister))
264 })
265 }
266}
267
268pub struct BlockingWrapper<I> {
269 handle: Handle,
270 inner: I,
271}
272
273impl<I> BlockingWrapper<I> {
274 fn new(handle: Handle, inner: I) -> Self {
275 Self { handle, inner }
276 }
277}
278
279impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
280 fn read(&mut self) -> Result<Buffer> {
281 self.handle.block_on(self.inner.read())
282 }
283}
284
285impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
286 fn write(&mut self, bs: Buffer) -> Result<()> {
287 self.handle.block_on(self.inner.write(bs))
288 }
289
290 fn close(&mut self) -> Result<Metadata> {
291 self.handle.block_on(self.inner.close())
292 }
293}
294
295impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
296 fn next(&mut self) -> Result<Option<oio::Entry>> {
297 self.handle.block_on(self.inner.next())
298 }
299}
300
301impl<I: oio::Delete + 'static> oio::BlockingDelete for BlockingWrapper<I> {
302 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
303 self.inner.delete(path, args)
304 }
305
306 fn flush(&mut self) -> Result<usize> {
307 self.handle.block_on(self.inner.flush())
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use std::sync::LazyLock;
314
315 use super::*;
316 use crate::types::Result;
317
318 static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
319 tokio::runtime::Builder::new_multi_thread()
320 .enable_all()
321 .build()
322 .unwrap()
323 });
324
325 fn create_blocking_layer() -> Result<BlockingLayer> {
326 let _guard = RUNTIME.enter();
327 BlockingLayer::create()
328 }
329
330 #[test]
331 fn test_blocking_layer_in_blocking_context() {
332 let layer = BlockingLayer::create();
334 assert!(layer.is_err());
335
336 let layer = create_blocking_layer();
338 assert!(layer.is_ok())
339 }
340
341 #[test]
342 fn test_blocking_layer_in_async_context() {
343 let _guard = RUNTIME.enter();
345
346 let layer = BlockingLayer::create();
347 assert!(layer.is_ok());
348 }
349}