opendal/services/monoiofs/
core.rs1use std::mem;
19use std::path::PathBuf;
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23use flume::Receiver;
24use flume::Sender;
25use futures::channel::oneshot;
26use futures::Future;
27use monoio::FusionDriver;
28use monoio::RuntimeBuilder;
29
30use crate::raw::*;
31use crate::*;
32
33pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; type TaskSpawner = Box<dyn FnOnce() + Send>;
37
38#[derive(Debug)]
39pub struct MonoiofsCore {
40 pub info: Arc<AccessorInfo>,
41
42 root: PathBuf,
43 tx: Sender<TaskSpawner>,
45 threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
47 pub buf_pool: oio::PooledBuf,
48}
49
50impl MonoiofsCore {
51 pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
52 let (tx, rx) = flume::unbounded();
57 let threads = (0..worker_threads)
58 .map(move |i| {
59 let rx = rx.clone();
60 std::thread::Builder::new()
61 .name(format!("monoiofs-worker-{i}"))
62 .spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
63 .expect("spawn worker thread should success")
64 })
65 .collect();
66 let threads = Mutex::new(threads);
67
68 Self {
69 info: {
70 let am = AccessorInfo::default();
71 am.set_scheme(Scheme::Monoiofs)
72 .set_root(&root.to_string_lossy())
73 .set_native_capability(Capability {
74 stat: true,
75 stat_has_content_length: true,
76 stat_has_last_modified: true,
77
78 read: true,
79
80 write: true,
81 write_can_append: true,
82
83 delete: true,
84 rename: true,
85 create_dir: true,
86 copy: true,
87 shared: true,
88 ..Default::default()
89 });
90 am.into()
91 },
92 root,
93 tx,
94 threads,
95 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
96 }
97 }
98
99 pub fn prepare_path(&self, path: &str) -> PathBuf {
101 self.root.join(path.trim_end_matches('/'))
102 }
103
104 pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
106 let path = self.prepare_path(path);
107 let parent = path
108 .parent()
109 .ok_or_else(|| {
110 Error::new(
111 ErrorKind::Unexpected,
112 "path should have parent but not, it must be malformed",
113 )
114 .with_context("input", path.to_string_lossy())
115 })?
116 .to_path_buf();
117 self.dispatch(move || monoio::fs::create_dir_all(parent))
118 .await
119 .map_err(new_std_io_error)?;
120 Ok(path)
121 }
122
123 fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
125 let mut rt = RuntimeBuilder::<FusionDriver>::new()
126 .enable_all()
127 .with_entries(io_uring_entries)
128 .build()
129 .expect("monoio runtime initialize should success");
130 rt.block_on(async {
133 while let Ok(spawner) = rx.recv_async().await {
134 spawner();
135 }
136 })
137 }
138
139 pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
142 where
143 F: FnOnce() -> Fut + 'static + Send,
144 Fut: Future<Output = T>,
145 T: 'static + Send,
146 {
147 let (tx, rx) = oneshot::channel();
149 let result = self
150 .tx
151 .send_async(Box::new(move || {
152 monoio::spawn(async move {
155 let _ = tx.send(f().await);
158 });
159 }))
160 .await;
161 self.unwrap(result);
162 self.unwrap(rx.await)
163 }
164
165 pub async fn spawn<F, Fut, T>(&self, f: F)
167 where
168 F: FnOnce() -> Fut + 'static + Send,
169 Fut: Future<Output = T> + 'static,
170 T: 'static,
171 {
172 let result = self
173 .tx
174 .send_async(Box::new(move || {
175 monoio::spawn(f());
178 }))
179 .await;
180 self.unwrap(result);
181 }
182
183 pub fn propagate_worker_panic(&self) -> ! {
188 let mut guard = self.threads.lock().expect("worker thread has panicked");
189 std::thread::sleep(Duration::from_millis(100));
191 let threads = mem::take(&mut *guard);
192 for thread in threads {
194 if thread.is_finished() {
195 match thread.join() {
198 Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
200 Err(e) => std::panic::resume_unwind(e),
202 }
203 }
204 }
205 unreachable!("this method should panic")
206 }
207
208 pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
213 match result {
214 Ok(result) => result,
215 Err(_) => self.propagate_worker_panic(),
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use std::sync::Arc;
223 use std::time::Duration;
224
225 use futures::channel::mpsc::UnboundedSender;
226 use futures::channel::mpsc::{self};
227 use futures::StreamExt;
228
229 use super::*;
230
231 fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
232 Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
233 }
234
235 async fn dispatch_simple(core: Arc<MonoiofsCore>) {
236 let result = core.dispatch(|| async { 42 }).await;
237 assert_eq!(result, 42);
238 let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
239 let bytes_clone = bytes.clone();
240 let result = core.dispatch(move || async move { bytes }).await;
241 assert_eq!(result, bytes_clone);
242 }
243
244 async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
245 let (tx, mut rx) = mpsc::unbounded();
246
247 fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
248 tokio::spawn(async move {
249 let result = core
250 .dispatch(move || async move {
251 monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
252 sleep_millis
253 })
254 .await;
255 assert_eq!(result, sleep_millis);
256 tx.unbounded_send(result).unwrap();
257 });
258 }
259
260 spawn_task(core.clone(), tx.clone(), 200);
261 spawn_task(core.clone(), tx.clone(), 20);
262 drop(tx);
263 let first = rx.next().await;
264 let second = rx.next().await;
265 let third = rx.next().await;
266 assert_eq!(first, Some(20));
267 assert_eq!(second, Some(200));
268 assert_eq!(third, None);
269 }
270
271 async fn dispatch_panic(core: Arc<MonoiofsCore>) {
272 core.dispatch(|| async { panic!("BOOM") }).await;
273 }
274
275 #[tokio::test]
276 async fn test_monoio_single_thread_dispatch() {
277 let core = new_core(1);
278 assert_eq!(core.threads.lock().unwrap().len(), 1);
279 dispatch_simple(core).await;
280 }
281
282 #[tokio::test]
283 async fn test_monoio_single_thread_dispatch_concurrent() {
284 let core = new_core(1);
285 dispatch_concurrent(core).await;
286 }
287
288 #[tokio::test]
289 #[should_panic(expected = "BOOM")]
290 async fn test_monoio_single_thread_dispatch_panic() {
291 let core = new_core(1);
292 dispatch_panic(core).await;
293 }
294
295 #[tokio::test]
296 async fn test_monoio_multi_thread_dispatch() {
297 let core = new_core(4);
298 assert_eq!(core.threads.lock().unwrap().len(), 4);
299 dispatch_simple(core).await;
300 }
301
302 #[tokio::test]
303 async fn test_monoio_multi_thread_dispatch_concurrent() {
304 let core = new_core(4);
305 dispatch_concurrent(core).await;
306 }
307
308 #[tokio::test]
309 #[should_panic(expected = "BOOM")]
310 async fn test_monoio_multi_thread_dispatch_panic() {
311 let core = new_core(4);
312 dispatch_panic(core).await;
313 }
314}