opendal/raw/futures_util.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::sync::atomic::AtomicUsize;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22
23use futures::FutureExt;
24
25use crate::*;
26
27/// BoxedFuture is the type alias of [`futures::future::BoxFuture`].
28#[cfg(not(target_arch = "wasm32"))]
29pub type BoxedFuture<'a, T> = futures::future::BoxFuture<'a, T>;
30#[cfg(target_arch = "wasm32")]
31/// BoxedFuture is the type alias of [`futures::future::LocalBoxFuture`].
32pub type BoxedFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;
33
34/// BoxedStaticFuture is the type alias of [`futures::future::BoxFuture`].
35#[cfg(not(target_arch = "wasm32"))]
36pub type BoxedStaticFuture<T> = futures::future::BoxFuture<'static, T>;
37#[cfg(target_arch = "wasm32")]
38/// BoxedStaticFuture is the type alias of [`futures::future::LocalBoxFuture`].
39pub type BoxedStaticFuture<T> = futures::future::LocalBoxFuture<'static, T>;
40
41/// MaybeSend is a marker to determine whether a type is `Send` or not.
42/// We use this trait to wrap the `Send` requirement for wasm32 target.
43///
44/// # Safety
45///
46/// [`MaybeSend`] is equivalent to `Send` on non-wasm32 target.
47/// And it's empty trait on wasm32 target to indicate that a type is not `Send`.
48#[cfg(not(target_arch = "wasm32"))]
49pub trait MaybeSend: Send {}
50
51/// MaybeSend is a marker to determine whether a type is `Send` or not.
52/// We use this trait to wrap the `Send` requirement for wasm32 target.
53///
54/// # Safety
55///
56/// [`MaybeSend`] is equivalent to `Send` on non-wasm32 target.
57/// And it's empty trait on wasm32 target to indicate that a type is not `Send`.
58#[cfg(target_arch = "wasm32")]
59pub trait MaybeSend {}
60
61#[cfg(not(target_arch = "wasm32"))]
62impl<T: Send> MaybeSend for T {}
63#[cfg(target_arch = "wasm32")]
64impl<T> MaybeSend for T {}
65
66/// ConcurrentTasks is used to execute tasks concurrently.
67///
68/// ConcurrentTasks has two generic types:
69///
70/// - `I` represents the input type of the task.
71/// - `O` represents the output type of the task.
72///
73/// # Implementation Notes
74///
75/// The code patterns below are intentional; please do not modify them unless you fully understand these notes.
76///
77/// ```skip
78/// let (i, o) = self
79/// .tasks
80/// .front_mut() // Use `front_mut` instead of `pop_front`
81/// .expect("tasks must be available")
82/// .await;
83/// ...
84/// match o {
85/// Ok(o) => {
86/// let _ = self.tasks.pop_front(); // `pop_front` after got `Ok(o)`
87/// self.results.push_back(o)
88/// }
89/// Err(err) => {
90/// if err.is_temporary() {
91/// let task = self.create_task(i);
92/// self.tasks
93/// .front_mut()
94/// .expect("tasks must be available")
95/// .replace(task) // Use replace here to instead of `push_front`
96/// } else {
97/// self.clear();
98/// self.errored = true;
99/// }
100/// return Err(err);
101/// }
102/// }
103/// ```
104///
105/// Please keep in mind that there is no guarantee the task will be `await`ed until completion. It's possible
106/// the task may be dropped before it resolves. Therefore, we should keep the `Task` in the `tasks` queue until
107/// it is resolved.
108///
109/// For example, users may have a timeout for the task, and the task will be dropped if it exceeds the timeout.
110/// If we `pop_front` the task before it resolves, the task will be canceled and the result will be lost.
111pub struct ConcurrentTasks<I, O> {
112 /// The executor to execute the tasks.
113 ///
114 /// If user doesn't provide an executor, the tasks will be executed with the default executor.
115 executor: Executor,
116 /// The factory to create the task.
117 ///
118 /// Caller of ConcurrentTasks must provides a factory to create the task for executing.
119 ///
120 /// The factory must accept an input and return a future that resolves to a tuple of input and
121 /// output result. If the given result is error, the error will be returned to users and the
122 /// task will be retried.
123 factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
124
125 /// `tasks` holds the ongoing tasks.
126 ///
127 /// Please keep in mind that all tasks are running in the background by `Executor`. We only need
128 /// to poll the tasks to see if they are ready.
129 ///
130 /// Dropping task without `await` it will cancel the task.
131 tasks: VecDeque<Task<(I, Result<O>)>>,
132 /// `results` stores the successful results.
133 results: VecDeque<O>,
134
135 /// The maximum number of concurrent tasks.
136 concurrent: usize,
137 /// The maximum number of completed tasks that can be buffered.
138 prefetch: usize,
139 /// Tracks the number of tasks that have finished execution but have not yet been collected.
140 /// This count is subtracted from the total concurrency capacity, ensuring that the system
141 /// always schedules new tasks to maintain the user's desired concurrency level.
142 ///
143 /// Example: If `concurrency = 10` and `completed_but_unretrieved = 3`,
144 /// the system can still spawn 7 new tasks (since 3 slots are "logically occupied"
145 /// by uncollected results).
146 completed_but_unretrieved: Arc<AtomicUsize>,
147 /// hitting the last unrecoverable error.
148 ///
149 /// If concurrent tasks hit an unrecoverable error, it will stop executing new tasks and return
150 /// an unrecoverable error to users.
151 errored: bool,
152}
153
154impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
155 /// Create a new concurrent tasks with given executor, concurrent, prefetch and factory.
156 ///
157 /// The factory is a function pointer that shouldn't capture any context.
158 pub fn new(
159 executor: Executor,
160 concurrent: usize,
161 prefetch: usize,
162 factory: fn(I) -> BoxedStaticFuture<(I, Result<O>)>,
163 ) -> Self {
164 Self {
165 executor,
166 factory,
167
168 tasks: VecDeque::with_capacity(concurrent),
169 results: VecDeque::with_capacity(concurrent),
170 concurrent,
171 prefetch,
172 completed_but_unretrieved: Arc::default(),
173 errored: false,
174 }
175 }
176
177 /// Return true if the tasks are running concurrently.
178 #[inline]
179 fn is_concurrent(&self) -> bool {
180 self.concurrent > 1
181 }
182
183 /// Clear all tasks and results.
184 ///
185 /// All ongoing tasks will be canceled.
186 pub fn clear(&mut self) {
187 self.tasks.clear();
188 self.results.clear();
189 }
190
191 /// Check if there are remaining space to push new tasks.
192 #[inline]
193 pub fn has_remaining(&self) -> bool {
194 let completed = self.completed_but_unretrieved.load(Ordering::Relaxed);
195 // Allow up to `prefetch` completed tasks to be buffered
196 self.tasks.len() < self.concurrent + completed.min(self.prefetch)
197 }
198
199 /// Chunk if there are remaining results to fetch.
200 #[inline]
201 pub fn has_result(&self) -> bool {
202 !self.results.is_empty()
203 }
204
205 /// Create a task with given input.
206 pub fn create_task(&self, input: I) -> Task<(I, Result<O>)> {
207 let completed = self.completed_but_unretrieved.clone();
208
209 let fut = (self.factory)(input).inspect(move |_| {
210 completed.fetch_add(1, Ordering::Relaxed);
211 });
212
213 self.executor.execute(fut)
214 }
215
216 /// Execute the task with given input.
217 ///
218 /// - Execute the task in the current thread if is not concurrent.
219 /// - Execute the task in the background if there are available slots.
220 /// - Await the first task in the queue if there is no available slots.
221 pub async fn execute(&mut self, input: I) -> Result<()> {
222 if self.errored {
223 return Err(Error::new(
224 ErrorKind::Unexpected,
225 "concurrent tasks met an unrecoverable error",
226 ));
227 }
228
229 // Short path for non-concurrent case.
230 if !self.is_concurrent() {
231 let (_, o) = (self.factory)(input).await;
232 return match o {
233 Ok(o) => {
234 self.results.push_back(o);
235 Ok(())
236 }
237 // We don't need to rebuild the future if it's not concurrent.
238 Err(err) => Err(err),
239 };
240 }
241
242 if !self.has_remaining() {
243 let (i, o) = self
244 .tasks
245 .front_mut()
246 .expect("tasks must be available")
247 .await;
248 self.completed_but_unretrieved
249 .fetch_sub(1, Ordering::Relaxed);
250 match o {
251 Ok(o) => {
252 let _ = self.tasks.pop_front();
253 self.results.push_back(o)
254 }
255 Err(err) => {
256 // Retry this task if the error is temporary
257 if err.is_temporary() {
258 let task = self.create_task(i);
259 self.tasks
260 .front_mut()
261 .expect("tasks must be available")
262 .replace(task)
263 } else {
264 self.clear();
265 self.errored = true;
266 }
267 return Err(err);
268 }
269 }
270 }
271
272 self.tasks.push_back(self.create_task(input));
273 Ok(())
274 }
275
276 /// Fetch the successful result from the result queue.
277 pub async fn next(&mut self) -> Option<Result<O>> {
278 if self.errored {
279 return Some(Err(Error::new(
280 ErrorKind::Unexpected,
281 "concurrent tasks met an unrecoverable error",
282 )));
283 }
284
285 if let Some(result) = self.results.pop_front() {
286 return Some(Ok(result));
287 }
288
289 if let Some(task) = self.tasks.front_mut() {
290 let (i, o) = task.await;
291 self.completed_but_unretrieved
292 .fetch_sub(1, Ordering::Relaxed);
293 return match o {
294 Ok(o) => {
295 let _ = self.tasks.pop_front();
296 Some(Ok(o))
297 }
298 Err(err) => {
299 // Retry this task if the error is temporary
300 if err.is_temporary() {
301 let task = self.create_task(i);
302 self.tasks
303 .front_mut()
304 .expect("tasks must be available")
305 .replace(task)
306 } else {
307 self.clear();
308 self.errored = true;
309 }
310 Some(Err(err))
311 }
312 };
313 }
314
315 None
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use std::time::Duration;
322
323 use pretty_assertions::assert_eq;
324 use rand::Rng;
325 use tokio::time::sleep;
326
327 use super::*;
328
329 #[tokio::test]
330 async fn test_concurrent_tasks() {
331 let executor = Executor::new();
332
333 let mut tasks = ConcurrentTasks::new(executor, 16, 8, |(i, dur)| {
334 Box::pin(async move {
335 sleep(dur).await;
336
337 // 5% rate to fail.
338 if rand::thread_rng().gen_range(0..100) > 90 {
339 return (
340 (i, dur),
341 Err(Error::new(ErrorKind::Unexpected, "I'm lucky").set_temporary()),
342 );
343 }
344 ((i, dur), Ok(i))
345 })
346 });
347
348 let mut ans = vec![];
349
350 for i in 0..10240 {
351 // Sleep up to 10ms
352 let dur = Duration::from_millis(rand::thread_rng().gen_range(0..10));
353 loop {
354 let res = tasks.execute((i, dur)).await;
355 if res.is_ok() {
356 break;
357 }
358 }
359 }
360
361 loop {
362 match tasks.next().await.transpose() {
363 Ok(Some(i)) => ans.push(i),
364 Ok(None) => break,
365 Err(_) => continue,
366 }
367 }
368
369 assert_eq!(ans, (0..10240).collect::<Vec<_>>())
370 }
371
372 #[tokio::test]
373 async fn test_prefetch_backpressure() {
374 let executor = Executor::new();
375 let concurrent = 4;
376 let prefetch = 2;
377
378 // Create a slower task to ensure they don't complete immediately
379 let mut tasks = ConcurrentTasks::new(executor, concurrent, prefetch, |i: usize| {
380 Box::pin(async move {
381 sleep(Duration::from_millis(100)).await;
382 (i, Ok(i))
383 })
384 });
385
386 // Initially, we should have space for concurrent tasks
387 assert!(tasks.has_remaining(), "Should have space initially");
388
389 // Submit concurrent tasks
390 for i in 0..concurrent {
391 assert!(tasks.has_remaining(), "Should have space for task {i}");
392 tasks.execute(i).await.unwrap();
393 }
394
395 // Now we shouldn't have any more space (since no tasks have completed yet)
396 assert!(
397 !tasks.has_remaining(),
398 "Should not have space after submitting concurrent tasks"
399 );
400
401 // Wait for some tasks to complete
402 sleep(Duration::from_millis(150)).await;
403
404 // Now we should have space up to prefetch limit
405 for i in concurrent..concurrent + prefetch {
406 assert!(
407 tasks.has_remaining(),
408 "Should have space for prefetch task {i}"
409 );
410 tasks.execute(i).await.unwrap();
411 }
412
413 // Now has_remaining should return false
414 assert!(
415 !tasks.has_remaining(),
416 "Should not have remaining space after filling up prefetch buffer"
417 );
418
419 // Retrieve one result
420 let result = tasks.next().await;
421 assert!(result.is_some());
422
423 // Now there should be space for one more task
424 assert!(
425 tasks.has_remaining(),
426 "Should have remaining space after retrieving one result"
427 );
428 }
429
430 #[tokio::test]
431 async fn test_prefetch_zero() {
432 let executor = Executor::new();
433 let concurrent = 4;
434 let prefetch = 0; // No prefetching allowed
435
436 let mut tasks = ConcurrentTasks::new(executor, concurrent, prefetch, |i: usize| {
437 Box::pin(async move {
438 sleep(Duration::from_millis(10)).await;
439 (i, Ok(i))
440 })
441 });
442
443 // With prefetch=0, we can only submit up to concurrent tasks
444 for i in 0..concurrent {
445 tasks.execute(i).await.unwrap();
446 }
447
448 // Should not have space for more
449 assert!(
450 !tasks.has_remaining(),
451 "Should not have remaining space with prefetch=0"
452 );
453
454 // Retrieve one result
455 let result = tasks.next().await;
456 assert!(result.is_some());
457
458 // Now there should be space for exactly one more task
459 assert!(
460 tasks.has_remaining(),
461 "Should have remaining space after retrieving one result"
462 );
463
464 // Execute one more
465 tasks.execute(concurrent).await.unwrap();
466
467 // Should be full again
468 assert!(!tasks.has_remaining(), "Should be full again");
469 }
470}