1use std::future::Future;
19use std::sync::Arc;
20
21use crate::raw::*;
22use crate::*;
23
24#[derive(Clone)]
114pub struct TimeoutLayer {
115 timeout: Duration,
116 io_timeout: Duration,
117}
118
119impl Default for TimeoutLayer {
120 fn default() -> Self {
121 Self {
122 timeout: Duration::from_secs(60),
123 io_timeout: Duration::from_secs(10),
124 }
125 }
126}
127
128impl TimeoutLayer {
129 pub fn new() -> Self {
131 Self::default()
132 }
133
134 pub fn with_timeout(mut self, timeout: Duration) -> Self {
138 self.timeout = timeout;
139 self
140 }
141
142 pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
146 self.io_timeout = timeout;
147 self
148 }
149}
150
151impl<A: Access> Layer<A> for TimeoutLayer {
152 type LayeredAccess = TimeoutAccessor<A>;
153
154 fn layer(&self, inner: A) -> Self::LayeredAccess {
155 let info = inner.info();
156 info.update_executor(|exec| {
157 Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
158 });
159
160 TimeoutAccessor {
161 inner,
162
163 timeout: self.timeout,
164 io_timeout: self.io_timeout,
165 }
166 }
167}
168
169#[derive(Debug, Clone)]
170pub struct TimeoutAccessor<A: Access> {
171 inner: A,
172
173 timeout: Duration,
174 io_timeout: Duration,
175}
176
177impl<A: Access> TimeoutAccessor<A> {
178 async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
179 tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
180 Error::new(ErrorKind::Unexpected, "operation timeout reached")
181 .with_operation(op)
182 .with_context("timeout", self.timeout.as_secs_f64().to_string())
183 .set_temporary()
184 })?
185 }
186
187 async fn io_timeout<F: Future<Output = Result<T>>, T>(
188 &self,
189 op: Operation,
190 fut: F,
191 ) -> Result<T> {
192 tokio::time::timeout(self.io_timeout, fut)
193 .await
194 .map_err(|_| {
195 Error::new(ErrorKind::Unexpected, "io timeout reached")
196 .with_operation(op)
197 .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
198 .set_temporary()
199 })?
200 }
201}
202
203impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
204 type Inner = A;
205 type Reader = TimeoutWrapper<A::Reader>;
206 type Writer = TimeoutWrapper<A::Writer>;
207 type Lister = TimeoutWrapper<A::Lister>;
208 type Deleter = TimeoutWrapper<A::Deleter>;
209
210 fn inner(&self) -> &Self::Inner {
211 &self.inner
212 }
213
214 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
215 self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
216 .await
217 }
218
219 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
220 self.io_timeout(Operation::Read, self.inner.read(path, args))
221 .await
222 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
223 }
224
225 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
226 self.io_timeout(Operation::Write, self.inner.write(path, args))
227 .await
228 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
229 }
230
231 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
232 self.timeout(Operation::Copy, self.inner.copy(from, to, args))
233 .await
234 }
235
236 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
237 self.timeout(Operation::Rename, self.inner.rename(from, to, args))
238 .await
239 }
240
241 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
242 self.timeout(Operation::Stat, self.inner.stat(path, args))
243 .await
244 }
245
246 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
247 self.timeout(Operation::Delete, self.inner.delete())
248 .await
249 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
250 }
251
252 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
253 self.io_timeout(Operation::List, self.inner.list(path, args))
254 .await
255 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
256 }
257
258 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
259 self.timeout(Operation::Presign, self.inner.presign(path, args))
260 .await
261 }
262}
263
264pub struct TimeoutExecutor {
265 exec: Arc<dyn Execute>,
266 timeout: Duration,
267}
268
269impl TimeoutExecutor {
270 pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
271 Self { exec, timeout }
272 }
273}
274
275impl Execute for TimeoutExecutor {
276 fn execute(&self, f: BoxedStaticFuture<()>) {
277 self.exec.execute(f)
278 }
279
280 fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
281 Some(Box::pin(tokio::time::sleep(self.timeout)))
282 }
283}
284
285pub struct TimeoutWrapper<R> {
286 inner: R,
287
288 timeout: Duration,
289}
290
291impl<R> TimeoutWrapper<R> {
292 fn new(inner: R, timeout: Duration) -> Self {
293 Self { inner, timeout }
294 }
295
296 #[inline]
297 async fn io_timeout<F: Future<Output = Result<T>>, T>(
298 timeout: Duration,
299 op: &'static str,
300 fut: F,
301 ) -> Result<T> {
302 tokio::time::timeout(timeout, fut).await.map_err(|_| {
303 Error::new(ErrorKind::Unexpected, "io operation timeout reached")
304 .with_operation(op)
305 .with_context("timeout", timeout.as_secs_f64().to_string())
306 .set_temporary()
307 })?
308 }
309}
310
311impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
312 async fn read(&mut self) -> Result<Buffer> {
313 let fut = self.inner.read();
314 Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
315 }
316}
317
318impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
319 async fn write(&mut self, bs: Buffer) -> Result<()> {
320 let fut = self.inner.write(bs);
321 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
322 }
323
324 async fn close(&mut self) -> Result<Metadata> {
325 let fut = self.inner.close();
326 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
327 }
328
329 async fn abort(&mut self) -> Result<()> {
330 let fut = self.inner.abort();
331 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
332 }
333}
334
335impl<R: oio::List> oio::List for TimeoutWrapper<R> {
336 async fn next(&mut self) -> Result<Option<oio::Entry>> {
337 let fut = self.inner.next();
338 Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
339 }
340}
341
342impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
343 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
344 self.inner.delete(path, args).await
345 }
346
347 async fn close(&mut self) -> Result<()> {
348 let fut = self.inner.close();
349 Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use std::future::Future;
356 use std::future::pending;
357 use std::sync::Arc;
358
359 use futures::StreamExt;
360 use tokio::time::sleep;
361 use tokio::time::timeout;
362
363 use crate::layers::TimeoutLayer;
364 use crate::layers::TypeEraseLayer;
365 use crate::raw::*;
366 use crate::*;
367
368 #[derive(Debug, Clone, Default)]
369 struct MockService;
370
371 impl Access for MockService {
372 type Reader = MockReader;
373 type Writer = ();
374 type Lister = MockLister;
375 type Deleter = ();
376
377 fn info(&self) -> Arc<AccessorInfo> {
378 let am = AccessorInfo::default();
379 am.set_native_capability(Capability {
380 read: true,
381 delete: true,
382 ..Default::default()
383 });
384
385 am.into()
386 }
387
388 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
390 Ok((RpRead::new(), MockReader))
391 }
392
393 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
395 sleep(Duration::from_secs(u64::MAX)).await;
396
397 Ok((RpDelete::default(), ()))
398 }
399
400 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
401 Ok((RpList::default(), MockLister))
402 }
403 }
404
405 #[derive(Debug, Clone, Default)]
406 struct MockReader;
407
408 impl oio::Read for MockReader {
409 fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
410 pending()
411 }
412 }
413
414 #[derive(Debug, Clone, Default)]
415 struct MockLister;
416
417 impl oio::List for MockLister {
418 fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
419 pending()
420 }
421 }
422
423 #[tokio::test]
424 async fn test_operation_timeout() {
425 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
426 let op = Operator::from_inner(acc)
427 .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
428
429 let fut = async {
430 let res = op.delete("test").await;
431 assert!(res.is_err());
432 let err = res.unwrap_err();
433 assert_eq!(err.kind(), ErrorKind::Unexpected);
434 assert!(err.to_string().contains("timeout"))
435 };
436
437 timeout(Duration::from_secs(2), fut)
438 .await
439 .expect("this test should not exceed 2 seconds")
440 }
441
442 #[tokio::test]
443 async fn test_io_timeout() {
444 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
445 let op = Operator::from_inner(acc)
446 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
447
448 let reader = op.reader("test").await.unwrap();
449
450 let res = reader.read(0..4).await;
451 assert!(res.is_err());
452 let err = res.unwrap_err();
453 assert_eq!(err.kind(), ErrorKind::Unexpected);
454 assert!(err.to_string().contains("timeout"))
455 }
456
457 #[tokio::test]
458 async fn test_list_timeout() {
459 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
460 let op = Operator::from_inner(acc).layer(
461 TimeoutLayer::new()
462 .with_timeout(Duration::from_secs(1))
463 .with_io_timeout(Duration::from_secs(1)),
464 );
465
466 let mut lister = op.lister("test").await.unwrap();
467
468 let res = lister.next().await.unwrap();
469 assert!(res.is_err());
470 let err = res.unwrap_err();
471 assert_eq!(err.kind(), ErrorKind::Unexpected);
472 assert!(err.to_string().contains("timeout"))
473 }
474
475 #[tokio::test]
476 async fn test_list_timeout_raw() {
477 use oio::List;
478
479 let acc = MockService;
480 let timeout_layer = TimeoutLayer::new()
481 .with_timeout(Duration::from_secs(1))
482 .with_io_timeout(Duration::from_secs(1));
483 let timeout_acc = timeout_layer.layer(acc);
484
485 let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
486 .await
487 .unwrap();
488
489 let res = lister.next().await;
490 assert!(res.is_err());
491 let err = res.unwrap_err();
492 assert_eq!(err.kind(), ErrorKind::Unexpected);
493 assert!(err.to_string().contains("timeout"));
494 }
495}