opendal/types/execute/
api.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::pin::Pin;
20use std::task::Context;
21use std::task::Poll;
22
23use futures::future::RemoteHandle;
24use futures::FutureExt;
25
26use crate::raw::BoxedStaticFuture;
27
28/// Execute trait is used to execute task in background.
29///
30/// # Notes about Timeout Implementation
31///
32/// Implementing a correct and elegant timeout mechanism is challenging for us.
33///
34/// The `Execute` trait must be object safe, allowing us to use `Arc<dyn Execute>`. Consequently,
35/// we cannot introduce a generic type parameter to `Execute`. We utilize [`RemoteHandle`] to
36/// implement the [`Execute::execute`] method. [`RemoteHandle`] operates by transmitting
37/// `Future::Output` through a channel, enabling the spawning of [`BoxedStaticFuture<()>`].
38///
39/// However, for timeouts, we need to spawn a future that resolves after a specified duration.
40/// Simply wrapping the future within another timeout future is not feasible because if the timeout
41/// is reached and the original future has not completed, it will be dropped—causing any held `Task`
42/// to panic.
43///
44/// As an alternative solution, we developed a `timeout` API. Users of the `Executor` should invoke
45/// this API when they require a timeout and combine it with their own futures using
46/// [`futures::select`].
47///
48/// This approach may seem inelegant but it allows us flexibility without being tied specifically
49/// to the Tokio runtime.
50///
51/// PLEASE raising an issue if you have a better solution.
52pub trait Execute: Send + Sync + 'static {
53    /// Execute async task in background.
54    ///
55    /// # Behavior
56    ///
57    /// - Implementor MUST manage the executing futures and keep making progress.
58    /// - Implementor MUST NOT drop futures until it's resolved.
59    fn execute(&self, f: BoxedStaticFuture<()>);
60
61    /// Return a future that will be resolved after the given timeout.
62    ///
63    /// Default implementation returns None.
64    fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
65        None
66    }
67}
68
69impl Execute for () {
70    fn execute(&self, _: BoxedStaticFuture<()>) {
71        panic!("concurrent tasks executed with no executor has been enabled")
72    }
73}
74
75/// Task is generated by Executor that represents an executing task.
76///
77/// Users can fetch the results by calling `poll` or `.await` on this task.
78/// Or, users can cancel the task by `drop` this task handle.
79///
80/// # Notes
81///
82/// Users don't need to call `poll` to make progress. All tasks are running in
83/// the background.
84pub struct Task<T> {
85    handle: RemoteHandle<T>,
86}
87
88impl<T: 'static> Task<T> {
89    /// Create a new task.
90    #[inline]
91    pub fn new(handle: RemoteHandle<T>) -> Self {
92        Self { handle }
93    }
94
95    /// Replace the task with a new task.
96    ///
97    /// The old task will be dropped directly.
98    #[inline]
99    pub fn replace(&mut self, new_task: Self) {
100        self.handle = new_task.handle;
101    }
102}
103
104impl<T: 'static> Future for Task<T> {
105    type Output = T;
106
107    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108        self.handle.poll_unpin(cx)
109    }
110}