opendal/services/monoiofs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem;
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::sync::Mutex;
22use std::time::Duration;
23
24use flume::Receiver;
25use flume::Sender;
26use futures::channel::oneshot;
27use futures::Future;
28use monoio::FusionDriver;
29use monoio::RuntimeBuilder;
30
31use crate::raw::*;
32use crate::*;
33
34pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2 MiB
35
36/// a boxed function that spawns task in current monoio runtime
37type TaskSpawner = Box<dyn FnOnce() + Send>;
38
39#[derive(Debug)]
40pub struct MonoiofsCore {
41    pub info: Arc<AccessorInfo>,
42
43    root: PathBuf,
44    /// sender that sends [`TaskSpawner`] to worker threads
45    tx: Sender<TaskSpawner>,
46    /// join handles of worker threads
47    threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
48    pub buf_pool: oio::PooledBuf,
49}
50
51impl MonoiofsCore {
52    pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
53        // Since users use monoiofs in a context of tokio, all monoio
54        // operations need to be dispatched to a dedicated thread pool
55        // where a monoio runtime runs on each thread. Here we spawn
56        // these worker threads.
57        let (tx, rx) = flume::unbounded();
58        let threads = (0..worker_threads)
59            .map(move |i| {
60                let rx = rx.clone();
61                std::thread::Builder::new()
62                    .name(format!("monoiofs-worker-{i}"))
63                    .spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
64                    .expect("spawn worker thread should success")
65            })
66            .collect();
67        let threads = Mutex::new(threads);
68
69        Self {
70            info: {
71                let am = AccessorInfo::default();
72                am.set_scheme(Scheme::Monoiofs)
73                    .set_root(&root.to_string_lossy())
74                    .set_native_capability(Capability {
75                        stat: true,
76
77                        read: true,
78
79                        write: true,
80                        write_can_append: true,
81
82                        delete: true,
83                        rename: true,
84                        create_dir: true,
85                        copy: true,
86                        shared: true,
87                        ..Default::default()
88                    });
89                am.into()
90            },
91            root,
92            tx,
93            threads,
94            buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
95        }
96    }
97
98    /// join root and path
99    pub fn prepare_path(&self, path: &str) -> PathBuf {
100        self.root.join(path.trim_end_matches('/'))
101    }
102
103    /// join root and path, create parent dirs
104    pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
105        let path = self.prepare_path(path);
106        let parent = path
107            .parent()
108            .ok_or_else(|| {
109                Error::new(
110                    ErrorKind::Unexpected,
111                    "path should have parent but not, it must be malformed",
112                )
113                .with_context("input", path.to_string_lossy())
114            })?
115            .to_path_buf();
116        self.dispatch(move || monoio::fs::create_dir_all(parent))
117            .await
118            .map_err(new_std_io_error)?;
119        Ok(path)
120    }
121
122    /// entrypoint of each worker thread, sets up monoio runtimes and channels
123    fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
124        let mut rt = RuntimeBuilder::<FusionDriver>::new()
125            .enable_all()
126            .with_entries(io_uring_entries)
127            .build()
128            .expect("monoio runtime initialize should success");
129        // run an infinite loop that receives TaskSpawner and calls
130        // them in a context of monoio
131        rt.block_on(async {
132            while let Ok(spawner) = rx.recv_async().await {
133                spawner();
134            }
135        })
136    }
137
138    /// Create a TaskSpawner, send it to the thread pool and wait
139    /// for its result. Task panic will propagate.
140    pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
141    where
142        F: FnOnce() -> Fut + 'static + Send,
143        Fut: Future<Output = T>,
144        T: 'static + Send,
145    {
146        // oneshot channel to send result back
147        let (tx, rx) = oneshot::channel();
148        let result = self
149            .tx
150            .send_async(Box::new(move || {
151                // task will be spawned on current thread, task panic
152                // will cause current worker thread panic
153                monoio::spawn(async move {
154                    // discard the result if send failed due to
155                    // MonoiofsCore::dispatch cancelled
156                    let _ = tx.send(f().await);
157                });
158            }))
159            .await;
160        self.unwrap(result);
161        self.unwrap(rx.await)
162    }
163
164    /// Create a TaskSpawner, send it to the thread pool and spawn the task.
165    pub async fn spawn<F, Fut, T>(&self, f: F)
166    where
167        F: FnOnce() -> Fut + 'static + Send,
168        Fut: Future<Output = T> + 'static,
169        T: 'static,
170    {
171        let result = self
172            .tx
173            .send_async(Box::new(move || {
174                // task will be spawned on current thread, task panic
175                // will cause current worker thread panic
176                monoio::spawn(f());
177            }))
178            .await;
179        self.unwrap(result);
180    }
181
182    /// This method always panics. It is called only when at least a
183    /// worker thread has panicked or meet a broken rx, which is
184    /// unrecoverable. It propagates worker thread's panic if there
185    /// is any and panics on normally exited thread.
186    pub fn propagate_worker_panic(&self) -> ! {
187        let mut guard = self.threads.lock().expect("worker thread has panicked");
188        // wait until the panicked thread exits
189        std::thread::sleep(Duration::from_millis(100));
190        let threads = mem::take(&mut *guard);
191        // we don't know which thread panicked, so check them one by one
192        for thread in threads {
193            if thread.is_finished() {
194                // worker thread runs an infinite loop, hence finished
195                // thread must have panicked or meet a broken rx.
196                match thread.join() {
197                    // rx is broken
198                    Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
199                    // thread has panicked
200                    Err(e) => std::panic::resume_unwind(e),
201                }
202            }
203        }
204        unreachable!("this method should panic")
205    }
206
207    /// Unwrap result if result is Ok, otherwise propagates worker thread's
208    /// panic. This method facilitates panic propagation in situation where
209    /// Err returned by broken channel indicates that the worker thread has
210    /// panicked.
211    pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
212        match result {
213            Ok(result) => result,
214            Err(_) => self.propagate_worker_panic(),
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::sync::Arc;
222    use std::time::Duration;
223
224    use futures::channel::mpsc::UnboundedSender;
225    use futures::channel::mpsc::{self};
226    use futures::StreamExt;
227
228    use super::*;
229
230    fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
231        Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
232    }
233
234    async fn dispatch_simple(core: Arc<MonoiofsCore>) {
235        let result = core.dispatch(|| async { 42 }).await;
236        assert_eq!(result, 42);
237        let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
238        let bytes_clone = bytes.clone();
239        let result = core.dispatch(move || async move { bytes }).await;
240        assert_eq!(result, bytes_clone);
241    }
242
243    async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
244        let (tx, mut rx) = mpsc::unbounded();
245
246        fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
247            tokio::spawn(async move {
248                let result = core
249                    .dispatch(move || async move {
250                        monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
251                        sleep_millis
252                    })
253                    .await;
254                assert_eq!(result, sleep_millis);
255                tx.unbounded_send(result).unwrap();
256            });
257        }
258
259        spawn_task(core.clone(), tx.clone(), 200);
260        spawn_task(core.clone(), tx.clone(), 20);
261        drop(tx);
262        let first = rx.next().await;
263        let second = rx.next().await;
264        let third = rx.next().await;
265        assert_eq!(first, Some(20));
266        assert_eq!(second, Some(200));
267        assert_eq!(third, None);
268    }
269
270    async fn dispatch_panic(core: Arc<MonoiofsCore>) {
271        core.dispatch(|| async { panic!("BOOM") }).await;
272    }
273
274    #[tokio::test]
275    async fn test_monoio_single_thread_dispatch() {
276        let core = new_core(1);
277        assert_eq!(core.threads.lock().unwrap().len(), 1);
278        dispatch_simple(core).await;
279    }
280
281    #[tokio::test]
282    async fn test_monoio_single_thread_dispatch_concurrent() {
283        let core = new_core(1);
284        dispatch_concurrent(core).await;
285    }
286
287    #[tokio::test]
288    #[should_panic(expected = "BOOM")]
289    async fn test_monoio_single_thread_dispatch_panic() {
290        let core = new_core(1);
291        dispatch_panic(core).await;
292    }
293
294    #[tokio::test]
295    async fn test_monoio_multi_thread_dispatch() {
296        let core = new_core(4);
297        assert_eq!(core.threads.lock().unwrap().len(), 4);
298        dispatch_simple(core).await;
299    }
300
301    #[tokio::test]
302    async fn test_monoio_multi_thread_dispatch_concurrent() {
303        let core = new_core(4);
304        dispatch_concurrent(core).await;
305    }
306
307    #[tokio::test]
308    #[should_panic(expected = "BOOM")]
309    async fn test_monoio_multi_thread_dispatch_panic() {
310        let core = new_core(4);
311        dispatch_panic(core).await;
312    }
313}