opendal/services/monoiofs/
core.rs1use std::mem;
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::sync::Mutex;
22use std::time::Duration;
23
24use flume::Receiver;
25use flume::Sender;
26use futures::channel::oneshot;
27use futures::Future;
28use monoio::FusionDriver;
29use monoio::RuntimeBuilder;
30
31use crate::raw::*;
32use crate::*;
33
34pub const BUFFER_SIZE: usize = 2 * 1024 * 1024; type TaskSpawner = Box<dyn FnOnce() + Send>;
38
39#[derive(Debug)]
40pub struct MonoiofsCore {
41 pub info: Arc<AccessorInfo>,
42
43 root: PathBuf,
44 tx: Sender<TaskSpawner>,
46 threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
48 pub buf_pool: oio::PooledBuf,
49}
50
51impl MonoiofsCore {
52 pub fn new(root: PathBuf, worker_threads: usize, io_uring_entries: u32) -> Self {
53 let (tx, rx) = flume::unbounded();
58 let threads = (0..worker_threads)
59 .map(move |i| {
60 let rx = rx.clone();
61 std::thread::Builder::new()
62 .name(format!("monoiofs-worker-{i}"))
63 .spawn(move || Self::worker_entrypoint(rx, io_uring_entries))
64 .expect("spawn worker thread should success")
65 })
66 .collect();
67 let threads = Mutex::new(threads);
68
69 Self {
70 info: {
71 let am = AccessorInfo::default();
72 am.set_scheme(Scheme::Monoiofs)
73 .set_root(&root.to_string_lossy())
74 .set_native_capability(Capability {
75 stat: true,
76 stat_has_content_length: true,
77 stat_has_last_modified: true,
78
79 read: true,
80
81 write: true,
82 write_can_append: true,
83
84 delete: true,
85 rename: true,
86 create_dir: true,
87 copy: true,
88 shared: true,
89 ..Default::default()
90 });
91 am.into()
92 },
93 root,
94 tx,
95 threads,
96 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(BUFFER_SIZE),
97 }
98 }
99
100 pub fn prepare_path(&self, path: &str) -> PathBuf {
102 self.root.join(path.trim_end_matches('/'))
103 }
104
105 pub async fn prepare_write_path(&self, path: &str) -> Result<PathBuf> {
107 let path = self.prepare_path(path);
108 let parent = path
109 .parent()
110 .ok_or_else(|| {
111 Error::new(
112 ErrorKind::Unexpected,
113 "path should have parent but not, it must be malformed",
114 )
115 .with_context("input", path.to_string_lossy())
116 })?
117 .to_path_buf();
118 self.dispatch(move || monoio::fs::create_dir_all(parent))
119 .await
120 .map_err(new_std_io_error)?;
121 Ok(path)
122 }
123
124 fn worker_entrypoint(rx: Receiver<TaskSpawner>, io_uring_entries: u32) {
126 let mut rt = RuntimeBuilder::<FusionDriver>::new()
127 .enable_all()
128 .with_entries(io_uring_entries)
129 .build()
130 .expect("monoio runtime initialize should success");
131 rt.block_on(async {
134 while let Ok(spawner) = rx.recv_async().await {
135 spawner();
136 }
137 })
138 }
139
140 pub async fn dispatch<F, Fut, T>(&self, f: F) -> T
143 where
144 F: FnOnce() -> Fut + 'static + Send,
145 Fut: Future<Output = T>,
146 T: 'static + Send,
147 {
148 let (tx, rx) = oneshot::channel();
150 let result = self
151 .tx
152 .send_async(Box::new(move || {
153 monoio::spawn(async move {
156 let _ = tx.send(f().await);
159 });
160 }))
161 .await;
162 self.unwrap(result);
163 self.unwrap(rx.await)
164 }
165
166 pub async fn spawn<F, Fut, T>(&self, f: F)
168 where
169 F: FnOnce() -> Fut + 'static + Send,
170 Fut: Future<Output = T> + 'static,
171 T: 'static,
172 {
173 let result = self
174 .tx
175 .send_async(Box::new(move || {
176 monoio::spawn(f());
179 }))
180 .await;
181 self.unwrap(result);
182 }
183
184 pub fn propagate_worker_panic(&self) -> ! {
189 let mut guard = self.threads.lock().expect("worker thread has panicked");
190 std::thread::sleep(Duration::from_millis(100));
192 let threads = mem::take(&mut *guard);
193 for thread in threads {
195 if thread.is_finished() {
196 match thread.join() {
199 Ok(()) => panic!("worker thread should not exit, tx may be dropped"),
201 Err(e) => std::panic::resume_unwind(e),
203 }
204 }
205 }
206 unreachable!("this method should panic")
207 }
208
209 pub fn unwrap<T, E>(&self, result: Result<T, E>) -> T {
214 match result {
215 Ok(result) => result,
216 Err(_) => self.propagate_worker_panic(),
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::sync::Arc;
224 use std::time::Duration;
225
226 use futures::channel::mpsc::UnboundedSender;
227 use futures::channel::mpsc::{self};
228 use futures::StreamExt;
229
230 use super::*;
231
232 fn new_core(worker_threads: usize) -> Arc<MonoiofsCore> {
233 Arc::new(MonoiofsCore::new(PathBuf::new(), worker_threads, 1024))
234 }
235
236 async fn dispatch_simple(core: Arc<MonoiofsCore>) {
237 let result = core.dispatch(|| async { 42 }).await;
238 assert_eq!(result, 42);
239 let bytes: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8];
240 let bytes_clone = bytes.clone();
241 let result = core.dispatch(move || async move { bytes }).await;
242 assert_eq!(result, bytes_clone);
243 }
244
245 async fn dispatch_concurrent(core: Arc<MonoiofsCore>) {
246 let (tx, mut rx) = mpsc::unbounded();
247
248 fn spawn_task(core: Arc<MonoiofsCore>, tx: UnboundedSender<u64>, sleep_millis: u64) {
249 tokio::spawn(async move {
250 let result = core
251 .dispatch(move || async move {
252 monoio::time::sleep(Duration::from_millis(sleep_millis)).await;
253 sleep_millis
254 })
255 .await;
256 assert_eq!(result, sleep_millis);
257 tx.unbounded_send(result).unwrap();
258 });
259 }
260
261 spawn_task(core.clone(), tx.clone(), 200);
262 spawn_task(core.clone(), tx.clone(), 20);
263 drop(tx);
264 let first = rx.next().await;
265 let second = rx.next().await;
266 let third = rx.next().await;
267 assert_eq!(first, Some(20));
268 assert_eq!(second, Some(200));
269 assert_eq!(third, None);
270 }
271
272 async fn dispatch_panic(core: Arc<MonoiofsCore>) {
273 core.dispatch(|| async { panic!("BOOM") }).await;
274 }
275
276 #[tokio::test]
277 async fn test_monoio_single_thread_dispatch() {
278 let core = new_core(1);
279 assert_eq!(core.threads.lock().unwrap().len(), 1);
280 dispatch_simple(core).await;
281 }
282
283 #[tokio::test]
284 async fn test_monoio_single_thread_dispatch_concurrent() {
285 let core = new_core(1);
286 dispatch_concurrent(core).await;
287 }
288
289 #[tokio::test]
290 #[should_panic(expected = "BOOM")]
291 async fn test_monoio_single_thread_dispatch_panic() {
292 let core = new_core(1);
293 dispatch_panic(core).await;
294 }
295
296 #[tokio::test]
297 async fn test_monoio_multi_thread_dispatch() {
298 let core = new_core(4);
299 assert_eq!(core.threads.lock().unwrap().len(), 4);
300 dispatch_simple(core).await;
301 }
302
303 #[tokio::test]
304 async fn test_monoio_multi_thread_dispatch_concurrent() {
305 let core = new_core(4);
306 dispatch_concurrent(core).await;
307 }
308
309 #[tokio::test]
310 #[should_panic(expected = "BOOM")]
311 async fn test_monoio_multi_thread_dispatch_panic() {
312 let core = new_core(4);
313 dispatch_panic(core).await;
314 }
315}