opendal/services/monoiofs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem;
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::sync::Mutex;
22use std::time::Duration;
23
24use flume::Receiver;
25use flume::Sender;
26use futures::channel::oneshot;
27use futures::Future;
28use monoio::FusionDriver;
29use monoio::RuntimeBuilder;
30
31use crate::raw::*;
32use crate::*;
33
34pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2 MiB
35
36/// a boxed function that spawns task in current monoio runtime
37type TaskSpawner = Box<dyn FnOnce() + Send>;
38
39#[derive(Debug)]
40pub struct MonoiofsCore {
41    pub info: Arc<AccessorInfo>,
42
43    root: PathBuf,
44    /// sender that sends [`TaskSpawner`] to worker threads
45    tx: Sender<TaskSpawner>,
46    /// join handles of worker threads
47    threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
48    pub buf_pool: oio::PooledBuf,
49}
50
51impl MonoiofsCore {
52    pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
53        // Since users use monoiofs in a context of tokio, all monoio
54        // operations need to be dispatched to a dedicated thread pool
55        // where a monoio runtime runs on each thread. Here we spawn
56        // these worker threads.
57        let (tx, rx) = flume::unbounded();
58        let threads = (0..worker_threads)
59            .map(move |i| {
60                let rx = rx.clone();
61                std::thread::Builder::new()
62                    .name(format!("monoiofs-worker-{i}"))
63                    .spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
64                    .expect("spawn worker thread should success")
65            })
66            .collect();
67        let threads = Mutex::new(threads);
68
69        Self {
70            info: {
71                let am = AccessorInfo::default();
72                am.set_scheme(Scheme::Monoiofs)
73                    .set_root(&root.to_string_lossy())
74                    .set_native_capability(Capability {
75                        stat: true,
76                        stat_has_content_length: true,
77                        stat_has_last_modified: true,
78
79                        read: true,
80
81                        write: true,
82                        write_can_append: true,
83
84                        delete: true,
85                        rename: true,
86                        create_dir: true,
87                        copy: true,
88                        shared: true,
89                        ..Default::default()
90                    });
91                am.into()
92            },
93            root,
94            tx,
95            threads,
96            buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
97        }
98    }
99
100    /// join root and path
101    pub fn prepare_path(&self, path: &str) -> PathBuf {
102        self.root.join(path.trim_end_matches('/'))
103    }
104
105    /// join root and path, create parent dirs
106    pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
107        let path = self.prepare_path(path);
108        let parent = path
109            .parent()
110            .ok_or_else(|| {
111                Error::new(
112                    ErrorKind::Unexpected,
113                    "path should have parent but not, it must be malformed",
114                )
115                .with_context("input", path.to_string_lossy())
116            })?
117            .to_path_buf();
118        self.dispatch(move || monoio::fs::create_dir_all(parent))
119            .await
120            .map_err(new_std_io_error)?;
121        Ok(path)
122    }
123
124    /// entrypoint of each worker thread, sets up monoio runtimes and channels
125    fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
126        let mut rt = RuntimeBuilder::<FusionDriver>::new()
127            .enable_all()
128            .with_entries(io_uring_entries)
129            .build()
130            .expect("monoio runtime initialize should success");
131        // run an infinite loop that receives TaskSpawner and calls
132        // them in a context of monoio
133        rt.block_on(async {
134            while let Ok(spawner) = rx.recv_async().await {
135                spawner();
136            }
137        })
138    }
139
140    /// Create a TaskSpawner, send it to the thread pool and wait
141    /// for its result. Task panic will propagate.
142    pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
143    where
144        F: FnOnce() -> Fut + 'static + Send,
145        Fut: Future<Output = T>,
146        T: 'static + Send,
147    {
148        // oneshot channel to send result back
149        let (tx, rx) = oneshot::channel();
150        let result = self
151            .tx
152            .send_async(Box::new(move || {
153                // task will be spawned on current thread, task panic
154                // will cause current worker thread panic
155                monoio::spawn(async move {
156                    // discard the result if send failed due to
157                    // MonoiofsCore::dispatch cancelled
158                    let _ = tx.send(f().await);
159                });
160            }))
161            .await;
162        self.unwrap(result);
163        self.unwrap(rx.await)
164    }
165
166    /// Create a TaskSpawner, send it to the thread pool and spawn the task.
167    pub async fn spawn<F, Fut, T>(&self, f: F)
168    where
169        F: FnOnce() -> Fut + 'static + Send,
170        Fut: Future<Output = T> + 'static,
171        T: 'static,
172    {
173        let result = self
174            .tx
175            .send_async(Box::new(move || {
176                // task will be spawned on current thread, task panic
177                // will cause current worker thread panic
178                monoio::spawn(f());
179            }))
180            .await;
181        self.unwrap(result);
182    }
183
184    /// This method always panics. It is called only when at least a
185    /// worker thread has panicked or meet a broken rx, which is
186    /// unrecoverable. It propagates worker thread's panic if there
187    /// is any and panics on normally exited thread.
188    pub fn propagate_worker_panic(&self) -> ! {
189        let mut guard = self.threads.lock().expect("worker thread has panicked");
190        // wait until the panicked thread exits
191        std::thread::sleep(Duration::from_millis(100));
192        let threads = mem::take(&mut *guard);
193        // we don't know which thread panicked, so check them one by one
194        for thread in threads {
195            if thread.is_finished() {
196                // worker thread runs an infinite loop, hence finished
197                // thread must have panicked or meet a broken rx.
198                match thread.join() {
199                    // rx is broken
200                    Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
201                    // thread has panicked
202                    Err(e) => std::panic::resume_unwind(e),
203                }
204            }
205        }
206        unreachable!("this method should panic")
207    }
208
209    /// Unwrap result if result is Ok, otherwise propagates worker thread's
210    /// panic. This method facilitates panic propagation in situation where
211    /// Err returned by broken channel indicates that the worker thread has
212    /// panicked.
213    pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
214        match result {
215            Ok(result) => result,
216            Err(_) => self.propagate_worker_panic(),
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use std::sync::Arc;
224    use std::time::Duration;
225
226    use futures::channel::mpsc::UnboundedSender;
227    use futures::channel::mpsc::{self};
228    use futures::StreamExt;
229
230    use super::*;
231
232    fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
233        Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
234    }
235
236    async fn dispatch_simple(core: Arc<MonoiofsCore>) {
237        let result = core.dispatch(|| async { 42 }).await;
238        assert_eq!(result, 42);
239        let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
240        let bytes_clone = bytes.clone();
241        let result = core.dispatch(move || async move { bytes }).await;
242        assert_eq!(result, bytes_clone);
243    }
244
245    async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
246        let (tx, mut rx) = mpsc::unbounded();
247
248        fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
249            tokio::spawn(async move {
250                let result = core
251                    .dispatch(move || async move {
252                        monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
253                        sleep_millis
254                    })
255                    .await;
256                assert_eq!(result, sleep_millis);
257                tx.unbounded_send(result).unwrap();
258            });
259        }
260
261        spawn_task(core.clone(), tx.clone(), 200);
262        spawn_task(core.clone(), tx.clone(), 20);
263        drop(tx);
264        let first = rx.next().await;
265        let second = rx.next().await;
266        let third = rx.next().await;
267        assert_eq!(first, Some(20));
268        assert_eq!(second, Some(200));
269        assert_eq!(third, None);
270    }
271
272    async fn dispatch_panic(core: Arc<MonoiofsCore>) {
273        core.dispatch(|| async { panic!("BOOM") }).await;
274    }
275
276    #[tokio::test]
277    async fn test_monoio_single_thread_dispatch() {
278        let core = new_core(1);
279        assert_eq!(core.threads.lock().unwrap().len(), 1);
280        dispatch_simple(core).await;
281    }
282
283    #[tokio::test]
284    async fn test_monoio_single_thread_dispatch_concurrent() {
285        let core = new_core(1);
286        dispatch_concurrent(core).await;
287    }
288
289    #[tokio::test]
290    #[should_panic(expected = "BOOM")]
291    async fn test_monoio_single_thread_dispatch_panic() {
292        let core = new_core(1);
293        dispatch_panic(core).await;
294    }
295
296    #[tokio::test]
297    async fn test_monoio_multi_thread_dispatch() {
298        let core = new_core(4);
299        assert_eq!(core.threads.lock().unwrap().len(), 4);
300        dispatch_simple(core).await;
301    }
302
303    #[tokio::test]
304    async fn test_monoio_multi_thread_dispatch_concurrent() {
305        let core = new_core(4);
306        dispatch_concurrent(core).await;
307    }
308
309    #[tokio::test]
310    #[should_panic(expected = "BOOM")]
311    async fn test_monoio_multi_thread_dispatch_panic() {
312        let core = new_core(4);
313        dispatch_panic(core).await;
314    }
315}