opendal/types/execute/api.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::pin::Pin;
20use std::task::Context;
21use std::task::Poll;
22
23use futures::future::RemoteHandle;
24use futures::FutureExt;
25
26use crate::raw::BoxedStaticFuture;
27
28/// Execute trait is used to execute task in background.
29///
30/// # Notes about Timeout Implementation
31///
32/// Implementing a correct and elegant timeout mechanism is challenging for us.
33///
34/// The `Execute` trait must be object safe, allowing us to use `Arc<dyn Execute>`. Consequently,
35/// we cannot introduce a generic type parameter to `Execute`. We utilize [`RemoteHandle`] to
36/// implement the [`Execute::execute`] method. [`RemoteHandle`] operates by transmitting
37/// `Future::Output` through a channel, enabling the spawning of [`BoxedStaticFuture<()>`].
38///
39/// However, for timeouts, we need to spawn a future that resolves after a specified duration.
40/// Simply wrapping the future within another timeout future is not feasible because if the timeout
41/// is reached and the original future has not completed, it will be dropped—causing any held `Task`
42/// to panic.
43///
44/// As an alternative solution, we developed a `timeout` API. Users of the `Executor` should invoke
45/// this API when they require a timeout and combine it with their own futures using
46/// [`futures::select`].
47///
48/// This approach may seem inelegant but it allows us flexibility without being tied specifically
49/// to the Tokio runtime.
50///
51/// PLEASE raising an issue if you have a better solution.
52pub trait Execute: Send + Sync + 'static {
53 /// Execute async task in background.
54 ///
55 /// # Behavior
56 ///
57 /// - Implementor MUST manage the executing futures and keep making progress.
58 /// - Implementor MUST NOT drop futures until it's resolved.
59 fn execute(&self, f: BoxedStaticFuture<()>);
60
61 /// Return a future that will be resolved after the given timeout.
62 ///
63 /// Default implementation returns None.
64 fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
65 None
66 }
67}
68
69impl Execute for () {
70 fn execute(&self, _: BoxedStaticFuture<()>) {
71 panic!("concurrent tasks executed with no executor has been enabled")
72 }
73}
74
75/// Task is generated by Executor that represents an executing task.
76///
77/// Users can fetch the results by calling `poll` or `.await` on this task.
78/// Or, users can cancel the task by `drop` this task handle.
79///
80/// # Notes
81///
82/// Users don't need to call `poll` to make progress. All tasks are running in
83/// the background.
84pub struct Task<T> {
85 handle: RemoteHandle<T>,
86}
87
88impl<T: 'static> Task<T> {
89 /// Create a new task.
90 #[inline]
91 pub fn new(handle: RemoteHandle<T>) -> Self {
92 Self { handle }
93 }
94
95 /// Replace the task with a new task.
96 ///
97 /// The old task will be dropped directly.
98 #[inline]
99 pub fn replace(&mut self, new_task: Self) {
100 self.handle = new_task.handle;
101 }
102}
103
104impl<T: 'static> Future for Task<T> {
105 type Output = T;
106
107 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108 self.handle.poll_unpin(cx)
109 }
110}