opendal/services/monoiofs/
core.rs1use std::mem;
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::sync::Mutex;
22use std::time::Duration;
23
24use flume::Receiver;
25use flume::Sender;
26use futures::channel::oneshot;
27use futures::Future;
28use monoio::FusionDriver;
29use monoio::RuntimeBuilder;
30
31use crate::raw::*;
32use crate::*;
33
34pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; type TaskSpawner = Box<dyn FnOnce() + Send>;
38
39#[derive(Debug)]
40pub struct MonoiofsCore {
41 pub info: Arc<AccessorInfo>,
42
43 root: PathBuf,
44 tx: Sender<TaskSpawner>,
46 threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
48 pub buf_pool: oio::PooledBuf,
49}
50
51impl MonoiofsCore {
52 pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
53 let (tx, rx) = flume::unbounded();
58 let threads = (0..worker_threads)
59 .map(move |i| {
60 let rx = rx.clone();
61 std::thread::Builder::new()
62 .name(format!("monoiofs-worker-{i}"))
63 .spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
64 .expect("spawn worker thread should success")
65 })
66 .collect();
67 let threads = Mutex::new(threads);
68
69 Self {
70 info: {
71 let am = AccessorInfo::default();
72 am.set_scheme(Scheme::Monoiofs)
73 .set_root(&root.to_string_lossy())
74 .set_native_capability(Capability {
75 stat: true,
76
77 read: true,
78
79 write: true,
80 write_can_append: true,
81
82 delete: true,
83 rename: true,
84 create_dir: true,
85 copy: true,
86 shared: true,
87 ..Default::default()
88 });
89 am.into()
90 },
91 root,
92 tx,
93 threads,
94 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
95 }
96 }
97
98 pub fn prepare_path(&self, path: &str) -> PathBuf {
100 self.root.join(path.trim_end_matches('/'))
101 }
102
103 pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
105 let path = self.prepare_path(path);
106 let parent = path
107 .parent()
108 .ok_or_else(|| {
109 Error::new(
110 ErrorKind::Unexpected,
111 "path should have parent but not, it must be malformed",
112 )
113 .with_context("input", path.to_string_lossy())
114 })?
115 .to_path_buf();
116 self.dispatch(move || monoio::fs::create_dir_all(parent))
117 .await
118 .map_err(new_std_io_error)?;
119 Ok(path)
120 }
121
122 fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
124 let mut rt = RuntimeBuilder::<FusionDriver>::new()
125 .enable_all()
126 .with_entries(io_uring_entries)
127 .build()
128 .expect("monoio runtime initialize should success");
129 rt.block_on(async {
132 while let Ok(spawner) = rx.recv_async().await {
133 spawner();
134 }
135 })
136 }
137
138 pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
141 where
142 F: FnOnce() -> Fut + 'static + Send,
143 Fut: Future<Output = T>,
144 T: 'static + Send,
145 {
146 let (tx, rx) = oneshot::channel();
148 let result = self
149 .tx
150 .send_async(Box::new(move || {
151 monoio::spawn(async move {
154 let _ = tx.send(f().await);
157 });
158 }))
159 .await;
160 self.unwrap(result);
161 self.unwrap(rx.await)
162 }
163
164 pub async fn spawn<F, Fut, T>(&self, f: F)
166 where
167 F: FnOnce() -> Fut + 'static + Send,
168 Fut: Future<Output = T> + 'static,
169 T: 'static,
170 {
171 let result = self
172 .tx
173 .send_async(Box::new(move || {
174 monoio::spawn(f());
177 }))
178 .await;
179 self.unwrap(result);
180 }
181
182 pub fn propagate_worker_panic(&self) -> ! {
187 let mut guard = self.threads.lock().expect("worker thread has panicked");
188 std::thread::sleep(Duration::from_millis(100));
190 let threads = mem::take(&mut *guard);
191 for thread in threads {
193 if thread.is_finished() {
194 match thread.join() {
197 Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
199 Err(e) => std::panic::resume_unwind(e),
201 }
202 }
203 }
204 unreachable!("this method should panic")
205 }
206
207 pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
212 match result {
213 Ok(result) => result,
214 Err(_) => self.propagate_worker_panic(),
215 }
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::sync::Arc;
222 use std::time::Duration;
223
224 use futures::channel::mpsc::UnboundedSender;
225 use futures::channel::mpsc::{self};
226 use futures::StreamExt;
227
228 use super::*;
229
230 fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
231 Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
232 }
233
234 async fn dispatch_simple(core: Arc<MonoiofsCore>) {
235 let result = core.dispatch(|| async { 42 }).await;
236 assert_eq!(result, 42);
237 let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
238 let bytes_clone = bytes.clone();
239 let result = core.dispatch(move || async move { bytes }).await;
240 assert_eq!(result, bytes_clone);
241 }
242
243 async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
244 let (tx, mut rx) = mpsc::unbounded();
245
246 fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
247 tokio::spawn(async move {
248 let result = core
249 .dispatch(move || async move {
250 monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
251 sleep_millis
252 })
253 .await;
254 assert_eq!(result, sleep_millis);
255 tx.unbounded_send(result).unwrap();
256 });
257 }
258
259 spawn_task(core.clone(), tx.clone(), 200);
260 spawn_task(core.clone(), tx.clone(), 20);
261 drop(tx);
262 let first = rx.next().await;
263 let second = rx.next().await;
264 let third = rx.next().await;
265 assert_eq!(first, Some(20));
266 assert_eq!(second, Some(200));
267 assert_eq!(third, None);
268 }
269
270 async fn dispatch_panic(core: Arc<MonoiofsCore>) {
271 core.dispatch(|| async { panic!("BOOM") }).await;
272 }
273
274 #[tokio::test]
275 async fn test_monoio_single_thread_dispatch() {
276 let core = new_core(1);
277 assert_eq!(core.threads.lock().unwrap().len(), 1);
278 dispatch_simple(core).await;
279 }
280
281 #[tokio::test]
282 async fn test_monoio_single_thread_dispatch_concurrent() {
283 let core = new_core(1);
284 dispatch_concurrent(core).await;
285 }
286
287 #[tokio::test]
288 #[should_panic(expected = "BOOM")]
289 async fn test_monoio_single_thread_dispatch_panic() {
290 let core = new_core(1);
291 dispatch_panic(core).await;
292 }
293
294 #[tokio::test]
295 async fn test_monoio_multi_thread_dispatch() {
296 let core = new_core(4);
297 assert_eq!(core.threads.lock().unwrap().len(), 4);
298 dispatch_simple(core).await;
299 }
300
301 #[tokio::test]
302 async fn test_monoio_multi_thread_dispatch_concurrent() {
303 let core = new_core(4);
304 dispatch_concurrent(core).await;
305 }
306
307 #[tokio::test]
308 #[should_panic(expected = "BOOM")]
309 async fn test_monoio_multi_thread_dispatch_panic() {
310 let core = new_core(4);
311 dispatch_panic(core).await;
312 }
313}