opendal/services/monoiofs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem;
19use std::path::PathBuf;
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23use flume::Receiver;
24use flume::Sender;
25use futures::channel::oneshot;
26use futures::Future;
27use monoio::FusionDriver;
28use monoio::RuntimeBuilder;
29
30use crate::raw::*;
31use crate::*;
32
33pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2 MiB
34
35/// a boxed function that spawns task in current monoio runtime
36type TaskSpawner = Box<dyn FnOnce() + Send>;
37
38#[derive(Debug)]
39pub struct MonoiofsCore {
40    pub info: Arc<AccessorInfo>,
41
42    root: PathBuf,
43    /// sender that sends [`TaskSpawner`] to worker threads
44    tx: Sender<TaskSpawner>,
45    /// join handles of worker threads
46    threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
47    pub buf_pool: oio::PooledBuf,
48}
49
50impl MonoiofsCore {
51    pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
52        // Since users use monoiofs in a context of tokio, all monoio
53        // operations need to be dispatched to a dedicated thread pool
54        // where a monoio runtime runs on each thread. Here we spawn
55        // these worker threads.
56        let (tx, rx) = flume::unbounded();
57        let threads = (0..worker_threads)
58            .map(move |i| {
59                let rx = rx.clone();
60                std::thread::Builder::new()
61                    .name(format!("monoiofs-worker-{i}"))
62                    .spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
63                    .expect("spawn worker thread should success")
64            })
65            .collect();
66        let threads = Mutex::new(threads);
67
68        Self {
69            info: {
70                let am = AccessorInfo::default();
71                am.set_scheme(Scheme::Monoiofs)
72                    .set_root(&root.to_string_lossy())
73                    .set_native_capability(Capability {
74                        stat: true,
75                        stat_has_content_length: true,
76                        stat_has_last_modified: true,
77
78                        read: true,
79
80                        write: true,
81                        write_can_append: true,
82
83                        delete: true,
84                        rename: true,
85                        create_dir: true,
86                        copy: true,
87                        shared: true,
88                        ..Default::default()
89                    });
90                am.into()
91            },
92            root,
93            tx,
94            threads,
95            buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
96        }
97    }
98
99    /// join root and path
100    pub fn prepare_path(&self, path: &str) -> PathBuf {
101        self.root.join(path.trim_end_matches('/'))
102    }
103
104    /// join root and path, create parent dirs
105    pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
106        let path = self.prepare_path(path);
107        let parent = path
108            .parent()
109            .ok_or_else(|| {
110                Error::new(
111                    ErrorKind::Unexpected,
112                    "path should have parent but not, it must be malformed",
113                )
114                .with_context("input", path.to_string_lossy())
115            })?
116            .to_path_buf();
117        self.dispatch(move || monoio::fs::create_dir_all(parent))
118            .await
119            .map_err(new_std_io_error)?;
120        Ok(path)
121    }
122
123    /// entrypoint of each worker thread, sets up monoio runtimes and channels
124    fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
125        let mut rt = RuntimeBuilder::<FusionDriver>::new()
126            .enable_all()
127            .with_entries(io_uring_entries)
128            .build()
129            .expect("monoio runtime initialize should success");
130        // run an infinite loop that receives TaskSpawner and calls
131        // them in a context of monoio
132        rt.block_on(async {
133            while let Ok(spawner) = rx.recv_async().await {
134                spawner();
135            }
136        })
137    }
138
139    /// Create a TaskSpawner, send it to the thread pool and wait
140    /// for its result. Task panic will propagate.
141    pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
142    where
143        F: FnOnce() -> Fut + 'static + Send,
144        Fut: Future<Output = T>,
145        T: 'static + Send,
146    {
147        // oneshot channel to send result back
148        let (tx, rx) = oneshot::channel();
149        let result = self
150            .tx
151            .send_async(Box::new(move || {
152                // task will be spawned on current thread, task panic
153                // will cause current worker thread panic
154                monoio::spawn(async move {
155                    // discard the result if send failed due to
156                    // MonoiofsCore::dispatch cancelled
157                    let _ = tx.send(f().await);
158                });
159            }))
160            .await;
161        self.unwrap(result);
162        self.unwrap(rx.await)
163    }
164
165    /// Create a TaskSpawner, send it to the thread pool and spawn the task.
166    pub async fn spawn<F, Fut, T>(&self, f: F)
167    where
168        F: FnOnce() -> Fut + 'static + Send,
169        Fut: Future<Output = T> + 'static,
170        T: 'static,
171    {
172        let result = self
173            .tx
174            .send_async(Box::new(move || {
175                // task will be spawned on current thread, task panic
176                // will cause current worker thread panic
177                monoio::spawn(f());
178            }))
179            .await;
180        self.unwrap(result);
181    }
182
183    /// This method always panics. It is called only when at least a
184    /// worker thread has panicked or meet a broken rx, which is
185    /// unrecoverable. It propagates worker thread's panic if there
186    /// is any and panics on normally exited thread.
187    pub fn propagate_worker_panic(&self) -> ! {
188        let mut guard = self.threads.lock().expect("worker thread has panicked");
189        // wait until the panicked thread exits
190        std::thread::sleep(Duration::from_millis(100));
191        let threads = mem::take(&mut *guard);
192        // we don't know which thread panicked, so check them one by one
193        for thread in threads {
194            if thread.is_finished() {
195                // worker thread runs an infinite loop, hence finished
196                // thread must have panicked or meet a broken rx.
197                match thread.join() {
198                    // rx is broken
199                    Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
200                    // thread has panicked
201                    Err(e) => std::panic::resume_unwind(e),
202                }
203            }
204        }
205        unreachable!("this method should panic")
206    }
207
208    /// Unwrap result if result is Ok, otherwise propagates worker thread's
209    /// panic. This method facilitates panic propagation in situation where
210    /// Err returned by broken channel indicates that the worker thread has
211    /// panicked.
212    pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
213        match result {
214            Ok(result) => result,
215            Err(_) => self.propagate_worker_panic(),
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use std::sync::Arc;
223    use std::time::Duration;
224
225    use futures::channel::mpsc::UnboundedSender;
226    use futures::channel::mpsc::{self};
227    use futures::StreamExt;
228
229    use super::*;
230
231    fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
232        Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
233    }
234
235    async fn dispatch_simple(core: Arc<MonoiofsCore>) {
236        let result = core.dispatch(|| async { 42 }).await;
237        assert_eq!(result, 42);
238        let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
239        let bytes_clone = bytes.clone();
240        let result = core.dispatch(move || async move { bytes }).await;
241        assert_eq!(result, bytes_clone);
242    }
243
244    async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
245        let (tx, mut rx) = mpsc::unbounded();
246
247        fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
248            tokio::spawn(async move {
249                let result = core
250                    .dispatch(move || async move {
251                        monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
252                        sleep_millis
253                    })
254                    .await;
255                assert_eq!(result, sleep_millis);
256                tx.unbounded_send(result).unwrap();
257            });
258        }
259
260        spawn_task(core.clone(), tx.clone(), 200);
261        spawn_task(core.clone(), tx.clone(), 20);
262        drop(tx);
263        let first = rx.next().await;
264        let second = rx.next().await;
265        let third = rx.next().await;
266        assert_eq!(first, Some(20));
267        assert_eq!(second, Some(200));
268        assert_eq!(third, None);
269    }
270
271    async fn dispatch_panic(core: Arc<MonoiofsCore>) {
272        core.dispatch(|| async { panic!("BOOM") }).await;
273    }
274
275    #[tokio::test]
276    async fn test_monoio_single_thread_dispatch() {
277        let core = new_core(1);
278        assert_eq!(core.threads.lock().unwrap().len(), 1);
279        dispatch_simple(core).await;
280    }
281
282    #[tokio::test]
283    async fn test_monoio_single_thread_dispatch_concurrent() {
284        let core = new_core(1);
285        dispatch_concurrent(core).await;
286    }
287
288    #[tokio::test]
289    #[should_panic(expected = "BOOM")]
290    async fn test_monoio_single_thread_dispatch_panic() {
291        let core = new_core(1);
292        dispatch_panic(core).await;
293    }
294
295    #[tokio::test]
296    async fn test_monoio_multi_thread_dispatch() {
297        let core = new_core(4);
298        assert_eq!(core.threads.lock().unwrap().len(), 4);
299        dispatch_simple(core).await;
300    }
301
302    #[tokio::test]
303    async fn test_monoio_multi_thread_dispatch_concurrent() {
304        let core = new_core(4);
305        dispatch_concurrent(core).await;
306    }
307
308    #[tokio::test]
309    #[should_panic(expected = "BOOM")]
310    async fn test_monoio_multi_thread_dispatch_panic() {
311        let core = new_core(4);
312        dispatch_panic(core).await;
313    }
314}