1use std::collections::HashMap;
19use std::ffi::CStr;
20use std::io::Read;
21use std::io::Write;
22use std::mem::size_of;
23use std::sync::Mutex;
24use std::time::Duration;
25
26use log::debug;
27use opendal::Buffer;
28use opendal::ErrorKind;
29use opendal::Operator;
30use sharded_slab::Slab;
31use tokio::runtime::Builder;
32use tokio::runtime::Runtime;
33use vm_memory::ByteValued;
34
35use crate::buffer::BufferWrapper;
36use crate::error::*;
37use crate::filesystem_message::*;
38use crate::virtiofs_util::Reader;
39use crate::virtiofs_util::Writer;
40
41const KERNEL_VERSION: u32 = 7;
43const KERNEL_MINOR_VERSION: u32 = 38;
45const MIN_KERNEL_MINOR_VERSION: u32 = 27;
47const BUFFER_HEADER_SIZE: u32 = 4096;
49const MAX_BUFFER_SIZE: u32 = 1 << 20;
51const DEFAULT_TTL: Duration = Duration::from_secs(1);
53const DEFAULT_OPENED_FILE_MODE: u32 = 0o755;
55
56enum FileType {
57 Dir,
58 File,
59}
60
61struct InnerWriter {
62 writer: opendal::Writer,
63 written: u64,
64}
65
66#[derive(Clone)]
67struct OpenedFile {
68 path: String,
69 metadata: Attr,
70}
71
72impl OpenedFile {
73 fn new(file_type: FileType, path: &str, uid: u32, gid: u32) -> OpenedFile {
74 let mut attr: Attr = unsafe { std::mem::zeroed() };
75 attr.uid = uid;
76 attr.gid = gid;
77 match file_type {
78 FileType::Dir => {
79 attr.nlink = 2;
80 attr.mode = libc::S_IFDIR | DEFAULT_OPENED_FILE_MODE;
81 }
82 FileType::File => {
83 attr.nlink = 1;
84 attr.mode = libc::S_IFREG | DEFAULT_OPENED_FILE_MODE;
85 }
86 }
87 OpenedFile {
88 path: path.to_string(),
89 metadata: attr,
90 }
91 }
92}
93
94fn opendal_error2error(error: opendal::Error) -> Error {
95 match error.kind() {
96 ErrorKind::Unsupported => Error::from(libc::EOPNOTSUPP),
97 ErrorKind::IsADirectory => Error::from(libc::EISDIR),
98 ErrorKind::NotFound => Error::from(libc::ENOENT),
99 ErrorKind::PermissionDenied => Error::from(libc::EACCES),
100 ErrorKind::AlreadyExists => Error::from(libc::EEXIST),
101 ErrorKind::NotADirectory => Error::from(libc::ENOTDIR),
102 ErrorKind::RangeNotSatisfied => Error::from(libc::EINVAL),
103 ErrorKind::RateLimited => Error::from(libc::EBUSY),
104 _ => Error::from(libc::ENOENT),
105 }
106}
107
108fn opendal_metadata2opened_file(
109 path: &str,
110 metadata: &opendal::Metadata,
111 uid: u32,
112 gid: u32,
113) -> OpenedFile {
114 let file_type = match metadata.mode() {
115 opendal::EntryMode::DIR => FileType::Dir,
116 _ => FileType::File,
117 };
118 OpenedFile::new(file_type, path, uid, gid)
119}
120
121pub struct Filesystem {
124 rt: Runtime,
125 core: Operator,
126 uid: u32,
127 gid: u32,
128 opened_files: Slab<OpenedFile>,
129 opened_files_map: Mutex<HashMap<String, u64>>,
132 opened_files_writer: tokio::sync::Mutex<HashMap<String, InnerWriter>>,
133}
134
135impl Filesystem {
136 pub fn new(core: Operator) -> Filesystem {
137 let rt = Builder::new_multi_thread()
138 .worker_threads(4)
139 .enable_all()
140 .build()
141 .unwrap();
142
143 Filesystem {
145 rt,
146 core,
147 uid: 1000,
148 gid: 1000,
149 opened_files: Slab::new(),
150 opened_files_map: Mutex::new(HashMap::new()),
151 opened_files_writer: tokio::sync::Mutex::new(HashMap::new()),
152 }
153 }
154
155 pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result<usize> {
156 let in_header: InHeader = r.read_obj().map_err(|e| {
157 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
158 })?;
159 if in_header.len > (MAX_BUFFER_SIZE + BUFFER_HEADER_SIZE) {
160 return Filesystem::reply_error(in_header.unique, w);
162 }
163 if let Ok(opcode) = Opcode::try_from(in_header.opcode) {
164 match opcode {
165 Opcode::Init => self.init(in_header, r, w),
166 Opcode::Destroy => self.destroy(in_header, r, w),
167 Opcode::Lookup => self.lookup(in_header, r, w),
168 Opcode::Getattr => self.getattr(in_header, r, w),
169 Opcode::Setattr => self.setattr(in_header, r, w),
170 Opcode::Create => self.create(in_header, r, w),
171 Opcode::Unlink => self.unlink(in_header, r, w),
172 Opcode::Release => self.release(in_header, r, w),
173 Opcode::Flush => self.flush(in_header, r, w),
174 Opcode::Forget => self.forget(in_header, r),
175 Opcode::Open => self.open(in_header, r, w),
176 Opcode::Read => self.read(in_header, r, w),
177 Opcode::Write => self.write(in_header, r, w),
178 }
179 } else {
180 Filesystem::reply_error(in_header.unique, w)
181 }
182 }
183}
184
185impl Filesystem {
186 fn reply_ok<T: ByteValued>(
187 out: Option<T>,
188 data: Option<&[u8]>,
189 unique: u64,
190 mut w: Writer,
191 ) -> Result<usize> {
192 let mut len = size_of::<OutHeader>();
193 if out.is_some() {
194 len += size_of::<T>();
195 }
196 if let Some(data) = data {
197 len += data.len();
198 }
199 let header = OutHeader {
200 unique,
201 error: 0, len: len as u32,
203 };
204 w.write_all(header.as_slice()).map_err(|e| {
205 new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
206 })?;
207 if let Some(out) = out {
208 w.write_all(out.as_slice()).map_err(|e| {
209 new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
210 })?;
211 }
212 if let Some(data) = data {
213 w.write_all(data).map_err(|e| {
214 new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
215 })?;
216 }
217 Ok(w.bytes_written())
218 }
219
220 fn reply_error(unique: u64, mut w: Writer) -> Result<usize> {
221 let header = OutHeader {
222 unique,
223 error: libc::EIO, len: size_of::<OutHeader>() as u32,
225 };
226 w.write_all(header.as_slice()).map_err(|e| {
227 new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
228 })?;
229 Ok(w.bytes_written())
230 }
231
232 fn bytes_to_str(buf: &[u8]) -> Result<&str> {
233 Filesystem::bytes_to_cstr(buf)?.to_str().map_err(|e| {
234 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
235 })
236 }
237
238 fn bytes_to_cstr(buf: &[u8]) -> Result<&CStr> {
239 CStr::from_bytes_with_nul(buf).map_err(|e| {
240 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
241 })
242 }
243
244 fn check_flags(&self, flags: u32) -> Result<(bool, bool)> {
245 let is_trunc = flags & libc::O_TRUNC as u32 != 0 || flags & libc::O_CREAT as u32 != 0;
246 let is_append = flags & libc::O_APPEND as u32 != 0;
247
248 let mode = flags & libc::O_ACCMODE as u32;
249 let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32 || is_append;
250
251 let capability = self.core.info().full_capability();
252 if is_trunc && !capability.write {
253 Err(Error::from(libc::EACCES))?;
254 }
255 if is_append && !capability.write_can_append {
256 Err(Error::from(libc::EACCES))?;
257 }
258
259 Ok((is_write, is_append))
260 }
261}
262
263impl Filesystem {
264 fn init(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
265 let InitIn { major, minor, .. } = r.read_obj().map_err(|e| {
266 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
267 })?;
268
269 if major != KERNEL_VERSION || minor < MIN_KERNEL_MINOR_VERSION {
270 return Filesystem::reply_error(in_header.unique, w);
271 }
272
273 let mut attr = OpenedFile::new(FileType::Dir, "/", self.uid, self.gid);
274 attr.metadata.ino = 1;
275 self.opened_files
278 .insert(attr.clone())
279 .expect("failed to allocate inode");
280 self.opened_files
281 .insert(attr.clone())
282 .expect("failed to allocate inode");
283 let mut opened_files_map = self.opened_files_map.lock().unwrap();
284 opened_files_map.insert("/".to_string(), 1);
285
286 let out = InitOut {
287 major: KERNEL_VERSION,
288 minor: KERNEL_MINOR_VERSION,
289 max_write: MAX_BUFFER_SIZE,
290 ..Default::default()
291 };
292 Filesystem::reply_ok(Some(out), None, in_header.unique, w)
293 }
294
295 fn destroy(&self, _in_header: InHeader, _r: Reader, _w: Writer) -> Result<usize> {
296 Ok(0)
298 }
299
300 fn flush(&self, _in_header: InHeader, _r: Reader, _w: Writer) -> Result<usize> {
301 Ok(0)
303 }
304
305 fn forget(&self, _in_header: InHeader, _r: Reader) -> Result<usize> {
306 Ok(0)
308 }
309
310 fn lookup(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
311 let name_len = in_header.len as usize - size_of::<InHeader>();
312 let mut buf = vec![0; name_len];
313 r.read_exact(&mut buf).map_err(|e| {
314 new_unexpected_error("failed to decode protocol messages", Some(e.into()))
315 })?;
316 let name = match Filesystem::bytes_to_str(buf.as_ref()) {
317 Ok(name) => name,
318 Err(_) => return Filesystem::reply_error(in_header.unique, w),
319 };
320
321 debug!("lookup: parent inode={} name={}", in_header.nodeid, name);
322
323 let parent_path = match self
324 .opened_files
325 .get(in_header.nodeid as usize)
326 .map(|f| f.path.clone())
327 {
328 Some(path) => path,
329 None => return Filesystem::reply_error(in_header.unique, w),
330 };
331
332 let path = format!("{}/{}", parent_path, name);
333 let metadata = match self.rt.block_on(self.do_get_metadata(&path)) {
334 Ok(metadata) => metadata,
335 Err(_) => return Filesystem::reply_error(in_header.unique, w),
336 };
337
338 let out = EntryOut {
339 nodeid: metadata.metadata.ino,
340 entry_valid: DEFAULT_TTL.as_secs(),
341 attr_valid: DEFAULT_TTL.as_secs(),
342 entry_valid_nsec: DEFAULT_TTL.subsec_nanos(),
343 attr_valid_nsec: DEFAULT_TTL.subsec_nanos(),
344 attr: metadata.metadata,
345 ..Default::default()
346 };
347 Filesystem::reply_ok(Some(out), None, in_header.unique, w)
348 }
349
350 fn getattr(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result<usize> {
351 debug!("getattr: inode={}", in_header.nodeid);
352
353 let path = match self
354 .opened_files
355 .get(in_header.nodeid as usize)
356 .map(|f| f.path.clone())
357 {
358 Some(path) => path,
359 None => return Filesystem::reply_error(in_header.unique, w),
360 };
361
362 let metadata = match self.rt.block_on(self.do_get_metadata(&path)) {
363 Ok(metadata) => metadata,
364 Err(_) => return Filesystem::reply_error(in_header.unique, w),
365 };
366
367 let out = AttrOut {
368 attr_valid: DEFAULT_TTL.as_secs(),
369 attr_valid_nsec: DEFAULT_TTL.subsec_nanos(),
370 attr: metadata.metadata,
371 ..Default::default()
372 };
373 Filesystem::reply_ok(Some(out), None, in_header.unique, w)
374 }
375
376 fn setattr(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result<usize> {
377 debug!("setattr: inode={}", in_header.nodeid);
378
379 self.getattr(in_header, _r, w)
381 }
382
383 fn create(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
384 let CreateIn { flags, .. } = r.read_obj().map_err(|e| {
385 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
386 })?;
387
388 let name_len = in_header.len as usize - size_of::<InHeader>() - size_of::<CreateIn>();
389 let mut buf = vec![0; name_len];
390 r.read_exact(&mut buf).map_err(|e| {
391 new_unexpected_error("failed to decode protocol messages", Some(e.into()))
392 })?;
393 let name = match Filesystem::bytes_to_str(buf.as_ref()) {
394 Ok(name) => name,
395 Err(_) => return Filesystem::reply_error(in_header.unique, w),
396 };
397
398 debug!("create: parent inode={} name={}", in_header.nodeid, name);
399
400 let parent_path = match self
401 .opened_files
402 .get(in_header.nodeid as usize)
403 .map(|f| f.path.clone())
404 {
405 Some(path) => path,
406 None => return Filesystem::reply_error(in_header.unique, w),
407 };
408
409 let path = format!("{}/{}", parent_path, name);
410 let mut attr = OpenedFile::new(FileType::File, &path, self.uid, self.gid);
411 let inode = self
412 .opened_files
413 .insert(attr.clone())
414 .expect("failed to allocate inode");
415 attr.metadata.ino = inode as u64;
416 let mut opened_files_map = self.opened_files_map.lock().unwrap();
417 opened_files_map.insert(path.to_string(), inode as u64);
418
419 match self.rt.block_on(self.do_set_writer(&path, flags)) {
420 Ok(writer) => writer,
421 Err(_) => return Filesystem::reply_error(in_header.unique, w),
422 };
423
424 let entry_out = EntryOut {
425 nodeid: attr.metadata.ino,
426 entry_valid: DEFAULT_TTL.as_secs(),
427 attr_valid: DEFAULT_TTL.as_secs(),
428 entry_valid_nsec: DEFAULT_TTL.subsec_nanos(),
429 attr_valid_nsec: DEFAULT_TTL.subsec_nanos(),
430 attr: attr.metadata,
431 ..Default::default()
432 };
433 let open_out = OpenOut {
434 ..Default::default()
435 };
436 Filesystem::reply_ok(
437 Some(entry_out),
438 Some(open_out.as_slice()),
439 in_header.unique,
440 w,
441 )
442 }
443
444 fn unlink(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
445 let name_len = in_header.len as usize - size_of::<InHeader>();
446 let mut buf = vec![0; name_len];
447 r.read_exact(&mut buf).map_err(|e| {
448 new_unexpected_error("failed to decode protocol messages", Some(e.into()))
449 })?;
450 let name = match Filesystem::bytes_to_str(buf.as_ref()) {
451 Ok(name) => name,
452 Err(_) => return Filesystem::reply_error(in_header.unique, w),
453 };
454
455 debug!("unlink: parent inode={} name={}", in_header.nodeid, name);
456
457 let parent_path = match self
458 .opened_files
459 .get(in_header.nodeid as usize)
460 .map(|f| f.path.clone())
461 {
462 Some(path) => path,
463 None => return Filesystem::reply_error(in_header.unique, w),
464 };
465
466 let path = format!("{}/{}", parent_path, name);
467 if self.rt.block_on(self.do_delete(&path)).is_err() {
468 return Filesystem::reply_error(in_header.unique, w);
469 }
470
471 let mut opened_files_map = self.opened_files_map.lock().unwrap();
472 opened_files_map.remove(&path);
473
474 Filesystem::reply_ok(None::<u8>, None, in_header.unique, w)
475 }
476
477 fn release(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result<usize> {
478 debug!("release: inode={}", in_header.nodeid);
479
480 let path = match self
481 .opened_files
482 .get(in_header.nodeid as usize)
483 .map(|f| f.path.clone())
484 {
485 Some(path) => path,
486 None => return Filesystem::reply_error(in_header.unique, w),
487 };
488
489 if self.rt.block_on(self.do_release_writer(&path)).is_err() {
490 return Filesystem::reply_error(in_header.unique, w);
491 }
492
493 Filesystem::reply_ok(None::<u8>, None, in_header.unique, w)
494 }
495
496 fn open(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
497 debug!("open: inode={}", in_header.nodeid);
498
499 let OpenIn { flags, .. } = r.read_obj().map_err(|e| {
500 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
501 })?;
502
503 let path = match self
504 .opened_files
505 .get(in_header.nodeid as usize)
506 .map(|f| f.path.clone())
507 {
508 Some(path) => path,
509 None => return Filesystem::reply_error(in_header.unique, w),
510 };
511
512 match self.rt.block_on(self.do_set_writer(&path, flags)) {
513 Ok(writer) => writer,
514 Err(_) => return Filesystem::reply_error(in_header.unique, w),
515 };
516
517 let out = OpenOut {
518 ..Default::default()
519 };
520 Filesystem::reply_ok(Some(out), None, in_header.unique, w)
521 }
522
523 fn read(&self, in_header: InHeader, mut r: Reader, mut w: Writer) -> Result<usize> {
524 let path = match self
525 .opened_files
526 .get(in_header.nodeid as usize)
527 .map(|f| f.path.clone())
528 {
529 Some(path) => path,
530 None => return Filesystem::reply_error(in_header.unique, w),
531 };
532
533 let ReadIn { offset, size, .. } = r.read_obj().map_err(|e| {
534 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
535 })?;
536
537 debug!(
538 "read: inode={} offset={} size={}",
539 in_header.nodeid, offset, size
540 );
541
542 let data = match self.rt.block_on(self.do_read(&path, offset)) {
543 Ok(data) => data,
544 Err(_) => return Filesystem::reply_error(in_header.unique, w),
545 };
546 let len = data.len();
547 let buffer = BufferWrapper::new(data);
548
549 let mut data_writer = w.split_at(size_of::<OutHeader>()).unwrap();
550 data_writer.write_from_at(&buffer, len).map_err(|e| {
551 new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
552 })?;
553
554 let out = OutHeader {
555 len: (size_of::<OutHeader>() + len) as u32,
556 error: 0,
557 unique: in_header.unique,
558 };
559 w.write_all(out.as_slice()).map_err(|e| {
560 new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
561 })?;
562 Ok(out.len as usize)
563 }
564
565 fn write(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
566 debug!("write: inode={}", in_header.nodeid);
567
568 let path = match self
569 .opened_files
570 .get(in_header.nodeid as usize)
571 .map(|f| f.path.clone())
572 {
573 Some(path) => path,
574 None => return Filesystem::reply_error(in_header.unique, w),
575 };
576
577 let WriteIn { offset, size, .. } = r.read_obj().map_err(|e| {
578 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
579 })?;
580
581 let buffer = BufferWrapper::new(Buffer::new());
582 r.read_to_at(&buffer, size as usize).map_err(|e| {
583 new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
584 })?;
585 let buffer = buffer.get_buffer();
586
587 match self.rt.block_on(self.do_write(&path, offset, buffer)) {
588 Ok(writer) => writer,
589 Err(_) => return Filesystem::reply_error(in_header.unique, w),
590 };
591
592 let out = WriteOut {
593 size,
594 ..Default::default()
595 };
596 Filesystem::reply_ok(Some(out), None, in_header.unique, w)
597 }
598}
599
600impl Filesystem {
601 async fn do_get_metadata(&self, path: &str) -> Result<OpenedFile> {
602 let metadata = self.core.stat(path).await.map_err(opendal_error2error)?;
603 let mut attr = opendal_metadata2opened_file(path, &metadata, self.uid, self.gid);
604 attr.metadata.size = metadata.content_length();
605 let mut opened_files_map = self.opened_files_map.lock().unwrap();
606 if let Some(inode) = opened_files_map.get(path) {
607 attr.metadata.ino = *inode;
608 } else {
609 let inode = self
610 .opened_files
611 .insert(attr.clone())
612 .expect("failed to allocate inode");
613 attr.metadata.ino = inode as u64;
614 opened_files_map.insert(path.to_string(), inode as u64);
615 }
616
617 Ok(attr)
618 }
619
620 async fn do_set_writer(&self, path: &str, flags: u32) -> Result<()> {
621 let (is_write, is_append) = self.check_flags(flags)?;
622 if !is_write {
623 return Ok(());
624 }
625
626 let writer = self
627 .core
628 .writer_with(path)
629 .append(is_append)
630 .await
631 .map_err(opendal_error2error)?;
632 let written = if is_append {
633 self.core
634 .stat(path)
635 .await
636 .map_err(opendal_error2error)?
637 .content_length()
638 } else {
639 0
640 };
641
642 let inner_writer = InnerWriter { writer, written };
643 let mut opened_file_writer = self.opened_files_writer.lock().await;
644 opened_file_writer.insert(path.to_string(), inner_writer);
645
646 Ok(())
647 }
648
649 async fn do_release_writer(&self, path: &str) -> Result<()> {
650 let mut opened_file_writer = self.opened_files_writer.lock().await;
651 let inner_writer = opened_file_writer
652 .get_mut(path)
653 .ok_or(Error::from(libc::EINVAL))?;
654 inner_writer
655 .writer
656 .close()
657 .await
658 .map_err(opendal_error2error)?;
659 opened_file_writer.remove(path);
660
661 Ok(())
662 }
663
664 async fn do_delete(&self, path: &str) -> Result<()> {
665 self.core.delete(path).await.map_err(opendal_error2error)?;
666
667 Ok(())
668 }
669
670 async fn do_read(&self, path: &str, offset: u64) -> Result<Buffer> {
671 let data = self
672 .core
673 .read_with(path)
674 .range(offset..)
675 .await
676 .map_err(opendal_error2error)?;
677
678 Ok(data)
679 }
680
681 async fn do_write(&self, path: &str, offset: u64, data: Buffer) -> Result<usize> {
682 let len = data.len();
683 let mut opened_file_writer = self.opened_files_writer.lock().await;
684 let inner_writer = opened_file_writer
685 .get_mut(path)
686 .ok_or(Error::from(libc::EINVAL))?;
687 if offset != inner_writer.written {
688 return Err(Error::from(libc::EINVAL));
689 }
690 inner_writer
691 .writer
692 .write_from(data)
693 .await
694 .map_err(opendal_error2error)?;
695 inner_writer.written += len as u64;
696
697 Ok(len)
698 }
699}