1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures::future::RemoteHandle;
use futures::FutureExt;
use crate::raw::BoxedStaticFuture;
/// Execute trait is used to execute task in background.
///
/// # Notes about Timeout Implementation
///
/// Implementing a correct and elegant timeout mechanism is challenging for us.
///
/// The `Execute` trait must be object safe, allowing us to use `Arc<dyn Execute>`. Consequently,
/// we cannot introduce a generic type parameter to `Execute`. We utilize [`RemoteHandle`] to
/// implement the [`Execute::execute`] method. [`RemoteHandle`] operates by transmitting
/// `Future::Output` through a channel, enabling the spawning of [`BoxedStaticFuture<()>`].
///
/// However, for timeouts, we need to spawn a future that resolves after a specified duration.
/// Simply wrapping the future within another timeout future is not feasible because if the timeout
/// is reached and the original future has not completed, it will be dropped—causing any held `Task`
/// to panic.
///
/// As an alternative solution, we developed a `timeout` API. Users of the `Executor` should invoke
/// this API when they require a timeout and combine it with their own futures using
/// [`futures::select`].
///
/// This approach may seem inelegant but it allows us flexibility without being tied specifically
/// to the Tokio runtime.
///
/// PLEASE raising an issue if you have a better solution.
pub trait Execute: Send + Sync + 'static {
/// Execute async task in background.
///
/// # Behavior
///
/// - Implementor MUST manage the executing futures and keep making progress.
/// - Implementor MUST NOT drop futures until it's resolved.
fn execute(&self, f: BoxedStaticFuture<()>);
/// Return a future that will be resolved after the given timeout.
///
/// Default implementation returns None.
fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
None
}
}
impl Execute for () {
fn execute(&self, _: BoxedStaticFuture<()>) {
panic!("concurrent tasks executed with no executor has been enabled")
}
}
/// Task is generated by Executor that represents an executing task.
///
/// Users can fetch the results by calling `poll` or `.await` on this task.
/// Or, users can cancel the task by `drop` this task handle.
///
/// # Notes
///
/// Users don't need to call `poll` to make progress. All tasks are running in
/// the background.
pub struct Task<T> {
handle: RemoteHandle<T>,
}
impl<T: 'static> Task<T> {
/// Create a new task.
#[inline]
pub fn new(handle: RemoteHandle<T>) -> Self {
Self { handle }
}
/// Replace the task with a new task.
///
/// The old task will be dropped directly.
#[inline]
pub fn replace(&mut self, new_task: Self) {
self.handle = new_task.handle;
}
}
impl<T: 'static> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.handle.poll_unpin(cx)
}
}