1use std::ffi::OsStr;
19use std::num::NonZeroU32;
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::time::Duration;
23use std::time::SystemTime;
24
25use bytes::Bytes;
26use fuse3::path::prelude::*;
27use fuse3::Errno;
28use fuse3::Result;
29use futures_util::stream;
30use futures_util::stream::BoxStream;
31use futures_util::StreamExt;
32use opendal::raw::normalize_path;
33use opendal::EntryMode;
34use opendal::ErrorKind;
35use opendal::Metadata;
36use opendal::Operator;
37use sharded_slab::Slab;
38use tokio::sync::Mutex;
39
40use super::file::FileKey;
41use super::file::InnerWriter;
42use super::file::OpenedFile;
43
44const TTL: Duration = Duration::from_secs(1); pub struct Filesystem {
86 op: Operator,
87 gid: u32,
88 uid: u32,
89
90 opened_files: Slab<OpenedFile>,
91}
92
93impl Filesystem {
94 pub fn new(op: Operator, uid: u32, gid: u32) -> Self {
96 Self {
97 op,
98 uid,
99 gid,
100 opened_files: Slab::new(),
101 }
102 }
103
104 fn check_flags(&self, flags: u32) -> Result<(bool, bool, bool)> {
105 let is_trunc = flags & libc::O_TRUNC as u32 != 0 || flags & libc::O_CREAT as u32 != 0;
106 let is_append = flags & libc::O_APPEND as u32 != 0;
107
108 let mode = flags & libc::O_ACCMODE as u32;
109 let is_read = mode == libc::O_RDONLY as u32 || mode == libc::O_RDWR as u32;
110 let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32 || is_append;
111 if !is_read && !is_write {
112 Err(Errno::from(libc::EINVAL))?;
113 }
114 if (is_write && !is_trunc && !is_append) || is_trunc && !is_write {
117 Err(Errno::from(libc::EINVAL))?;
118 }
119
120 let capability = self.op.info().full_capability();
121 if is_read && !capability.read {
122 Err(Errno::from(libc::EACCES))?;
123 }
124 if is_trunc && !capability.write {
125 Err(Errno::from(libc::EACCES))?;
126 }
127 if is_append && !capability.write_can_append {
128 Err(Errno::from(libc::EACCES))?;
129 }
130
131 log::trace!(
132 "check_flags: is_read={}, is_write={}, is_trunc={}, is_append={}",
133 is_read,
134 is_write,
135 is_trunc,
136 is_append
137 );
138 Ok((is_read, is_trunc, is_append))
139 }
140
141 fn get_opened_file(
143 &self,
144 key: FileKey,
145 path: Option<&OsStr>,
146 ) -> Result<sharded_slab::Entry<OpenedFile>> {
147 let file = self
148 .opened_files
149 .get(key.0)
150 .ok_or(Errno::from(libc::ENOENT))?;
151
152 if matches!(path, Some(path) if path != file.path) {
153 log::trace!(
154 "get_opened_file: path not match: path={:?}, file={:?}",
155 path,
156 file.path
157 );
158 Err(Errno::from(libc::EBADF))?;
159 }
160
161 Ok(file)
162 }
163}
164
165impl PathFilesystem for Filesystem {
166 async fn init(&self, _req: Request) -> Result<ReplyInit> {
168 Ok(ReplyInit {
169 max_write: NonZeroU32::new(16 * 1024).unwrap(),
170 })
171 }
172
173 async fn destroy(&self, _req: Request) {}
175
176 async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<ReplyEntry> {
177 log::debug!("lookup(parent={:?}, name={:?})", parent, name);
178
179 let path = PathBuf::from(parent).join(name);
180 let metadata = self
181 .op
182 .stat(&path.to_string_lossy())
183 .await
184 .map_err(opendal_error2errno)?;
185
186 let now = SystemTime::now();
187 let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
188
189 Ok(ReplyEntry { ttl: TTL, attr })
190 }
191
192 async fn getattr(
193 &self,
194 _req: Request,
195 path: Option<&OsStr>,
196 fh: Option<u64>,
197 flags: u32,
198 ) -> Result<ReplyAttr> {
199 log::debug!("getattr(path={:?}, fh={:?}, flags={:?})", path, fh, flags);
200
201 let fh_path = fh.and_then(|fh| {
202 self.opened_files
203 .get(FileKey::try_from(fh).ok()?.0)
204 .map(|f| f.path.clone())
205 });
206
207 let file_path = match (path.map(Into::into), fh_path) {
208 (Some(a), Some(b)) => {
209 if a != b {
210 Err(Errno::from(libc::EBADF))?;
211 }
212 Some(a)
213 }
214 (a, b) => a.or(b),
215 };
216
217 let metadata = self
218 .op
219 .stat(&file_path.unwrap_or_default().to_string_lossy())
220 .await
221 .map_err(opendal_error2errno)?;
222
223 let now = SystemTime::now();
224 let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
225
226 Ok(ReplyAttr { ttl: TTL, attr })
227 }
228
229 async fn setattr(
230 &self,
231 _req: Request,
232 path: Option<&OsStr>,
233 fh: Option<u64>,
234 set_attr: SetAttr,
235 ) -> Result<ReplyAttr> {
236 log::debug!(
237 "setattr(path={:?}, fh={:?}, set_attr={:?})",
238 path,
239 fh,
240 set_attr
241 );
242
243 self.getattr(_req, path, fh, 0).await
244 }
245
246 async fn symlink(
247 &self,
248 _req: Request,
249 parent: &OsStr,
250 name: &OsStr,
251 link_path: &OsStr,
252 ) -> Result<ReplyEntry> {
253 log::debug!(
254 "symlink(parent={:?}, name={:?}, link_path={:?})",
255 parent,
256 name,
257 link_path
258 );
259 Err(libc::EOPNOTSUPP.into())
260 }
261
262 async fn mknod(
263 &self,
264 _req: Request,
265 parent: &OsStr,
266 name: &OsStr,
267 mode: u32,
268 _rdev: u32,
269 ) -> Result<ReplyEntry> {
270 log::debug!(
271 "mknod(parent={:?}, name={:?}, mode=0o{:o})",
272 parent,
273 name,
274 mode
275 );
276 Err(libc::EOPNOTSUPP.into())
277 }
278
279 async fn mkdir(
280 &self,
281 _req: Request,
282 parent: &OsStr,
283 name: &OsStr,
284 mode: u32,
285 _umask: u32,
286 ) -> Result<ReplyEntry> {
287 log::debug!(
288 "mkdir(parent={:?}, name={:?}, mode=0o{:o})",
289 parent,
290 name,
291 mode
292 );
293
294 let mut path = PathBuf::from(parent).join(name);
295 path.push(""); self.op
297 .create_dir(&path.to_string_lossy())
298 .await
299 .map_err(opendal_error2errno)?;
300
301 let now = SystemTime::now();
302 let attr = dummy_file_attr(FileType::Directory, now, self.uid, self.gid);
303
304 Ok(ReplyEntry { ttl: TTL, attr })
305 }
306
307 async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> {
308 log::debug!("unlink(parent={:?}, name={:?})", parent, name);
309
310 let path = PathBuf::from(parent).join(name);
311 self.op
312 .delete(&path.to_string_lossy())
313 .await
314 .map_err(opendal_error2errno)?;
315
316 Ok(())
317 }
318
319 async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> {
320 log::debug!("rmdir(parent={:?}, name={:?})", parent, name);
321
322 let path = PathBuf::from(parent).join(name);
323 self.op
324 .delete(&path.to_string_lossy())
325 .await
326 .map_err(opendal_error2errno)?;
327
328 Ok(())
329 }
330
331 async fn rename(
332 &self,
333 _req: Request,
334 origin_parent: &OsStr,
335 origin_name: &OsStr,
336 parent: &OsStr,
337 name: &OsStr,
338 ) -> Result<()> {
339 log::debug!(
340 "rename(p={:?}, name={:?}, newp={:?}, newname={:?})",
341 origin_parent,
342 origin_name,
343 parent,
344 name
345 );
346
347 if !self.op.info().full_capability().rename {
348 return Err(Errno::from(libc::ENOTSUP))?;
349 }
350
351 let origin_path = PathBuf::from(origin_parent).join(origin_name);
352 let path = PathBuf::from(parent).join(name);
353
354 self.op
355 .rename(&origin_path.to_string_lossy(), &path.to_string_lossy())
356 .await
357 .map_err(opendal_error2errno)?;
358
359 Ok(())
360 }
361
362 async fn link(
363 &self,
364 _req: Request,
365 path: &OsStr,
366 new_parent: &OsStr,
367 new_name: &OsStr,
368 ) -> Result<ReplyEntry> {
369 log::debug!(
370 "link(path={:?}, new_parent={:?}, new_name={:?})",
371 path,
372 new_parent,
373 new_name
374 );
375 Err(libc::EOPNOTSUPP.into())
376 }
377
378 async fn opendir(&self, _req: Request, path: &OsStr, flags: u32) -> Result<ReplyOpen> {
379 log::debug!("opendir(path={:?}, flags=0x{:x})", path, flags);
380 Ok(ReplyOpen { fh: 0, flags })
381 }
382
383 async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result<ReplyOpen> {
384 log::debug!("open(path={:?}, flags=0x{:x})", path, flags);
385
386 let (is_read, is_trunc, is_append) = self.check_flags(flags)?;
387 if flags & libc::O_CREAT as u32 != 0 {
388 self.op
389 .write(&path.to_string_lossy(), Bytes::new())
390 .await
391 .map_err(opendal_error2errno)?;
392 }
393
394 let inner_writer = if is_trunc || is_append {
395 let writer = self
396 .op
397 .writer_with(&path.to_string_lossy())
398 .append(is_append)
399 .await
400 .map_err(opendal_error2errno)?;
401 let written = if is_append {
402 self.op
403 .stat(&path.to_string_lossy())
404 .await
405 .map_err(opendal_error2errno)?
406 .content_length()
407 } else {
408 0
409 };
410 Some(Arc::new(Mutex::new(InnerWriter { writer, written })))
411 } else {
412 None
413 };
414
415 let key = self
416 .opened_files
417 .insert(OpenedFile {
418 path: path.into(),
419 is_read,
420 inner_writer,
421 })
422 .ok_or(Errno::from(libc::EBUSY))?;
423
424 Ok(ReplyOpen {
425 fh: FileKey(key).to_fh(),
426 flags,
427 })
428 }
429
430 async fn read(
431 &self,
432 _req: Request,
433 path: Option<&OsStr>,
434 fh: u64,
435 offset: u64,
436 size: u32,
437 ) -> Result<ReplyData> {
438 log::debug!(
439 "read(path={:?}, fh={}, offset={}, size={})",
440 path,
441 fh,
442 offset,
443 size
444 );
445
446 let file_path = {
447 let file = self.get_opened_file(FileKey::try_from(fh)?, path)?;
448 if !file.is_read {
449 Err(Errno::from(libc::EACCES))?;
450 }
451 file.path.to_string_lossy().to_string()
452 };
453
454 let data = self
455 .op
456 .read_with(&file_path)
457 .range(offset..)
458 .await
459 .map_err(opendal_error2errno)?;
460
461 Ok(ReplyData {
462 data: data.to_bytes(),
463 })
464 }
465
466 async fn write(
467 &self,
468 _req: Request,
469 path: Option<&OsStr>,
470 fh: u64,
471 offset: u64,
472 data: &[u8],
473 _write_flags: u32,
474 flags: u32,
475 ) -> Result<ReplyWrite> {
476 log::debug!(
477 "write(path={:?}, fh={}, offset={}, data_len={}, flags=0x{:x})",
478 path,
479 fh,
480 offset,
481 data.len(),
482 flags
483 );
484
485 let Some(inner_writer) = ({
486 self.get_opened_file(FileKey::try_from(fh)?, path)?
487 .inner_writer
488 .clone()
489 }) else {
490 Err(Errno::from(libc::EACCES))?
491 };
492
493 let mut inner = inner_writer.lock().await;
494 if offset != inner.written {
496 Err(Errno::from(libc::EINVAL))?;
497 }
498
499 inner
500 .writer
501 .write_from(data)
502 .await
503 .map_err(opendal_error2errno)?;
504 inner.written += data.len() as u64;
505
506 Ok(ReplyWrite {
507 written: data.len() as _,
508 })
509 }
510
511 async fn release(
512 &self,
513 _req: Request,
514 path: Option<&OsStr>,
515 fh: u64,
516 flags: u32,
517 lock_owner: u64,
518 flush: bool,
519 ) -> Result<()> {
520 log::debug!(
521 "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})",
522 path,
523 fh,
524 flags,
525 lock_owner,
526 flush
527 );
528
529 let _ = self.opened_files.take(FileKey::try_from(fh)?.0);
531 Ok(())
532 }
533
534 async fn flush(
539 &self,
540 _req: Request,
541 path: Option<&OsStr>,
542 fh: u64,
543 lock_owner: u64,
544 ) -> Result<()> {
545 log::debug!(
546 "flush(path={:?}, fh={}, lock_owner={})",
547 path,
548 fh,
549 lock_owner,
550 );
551
552 let file = self
553 .opened_files
554 .take(FileKey::try_from(fh)?.0)
555 .ok_or(Errno::from(libc::EBADF))?;
556
557 if let Some(inner_writer) = file.inner_writer {
558 let mut lock = inner_writer.lock().await;
559 let res = lock.writer.close().await.map_err(opendal_error2errno);
560 return res.map(|_| ());
561 }
562
563 if matches!(path, Some(ref p) if p != &file.path) {
564 Err(Errno::from(libc::EBADF))?;
565 }
566
567 Ok(())
568 }
569
570 type DirEntryStream<'a> = BoxStream<'a, Result<DirectoryEntry>>;
571
572 async fn readdir<'a>(
573 &'a self,
574 _req: Request,
575 path: &'a OsStr,
576 fh: u64,
577 offset: i64,
578 ) -> Result<ReplyDirectory<Self::DirEntryStream<'a>>> {
579 log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset);
580
581 let mut current_dir = PathBuf::from(path);
582 current_dir.push(""); let path = current_dir.to_string_lossy().to_string();
584 let children = self
585 .op
586 .lister(¤t_dir.to_string_lossy())
587 .await
588 .map_err(opendal_error2errno)?
589 .filter_map(move |entry| {
590 let dir = normalize_path(path.as_str());
591 async move {
592 match entry {
593 Ok(e) if e.path() == dir => None,
594 _ => Some(entry),
595 }
596 }
597 })
598 .enumerate()
599 .map(|(i, entry)| {
600 entry
601 .map(|e| DirectoryEntry {
602 kind: entry_mode2file_type(e.metadata().mode()),
603 name: e.name().trim_matches('/').into(),
604 offset: (i + 3) as i64,
605 })
606 .map_err(opendal_error2errno)
607 });
608
609 let relative_paths = stream::iter([
610 Result::Ok(DirectoryEntry {
611 kind: FileType::Directory,
612 name: ".".into(),
613 offset: 1,
614 }),
615 Result::Ok(DirectoryEntry {
616 kind: FileType::Directory,
617 name: "..".into(),
618 offset: 2,
619 }),
620 ]);
621
622 Ok(ReplyDirectory {
623 entries: relative_paths.chain(children).skip(offset as usize).boxed(),
624 })
625 }
626
627 async fn access(&self, _req: Request, path: &OsStr, mask: u32) -> Result<()> {
628 log::debug!("access(path={:?}, mask=0x{:x})", path, mask);
629
630 self.op
631 .stat(&path.to_string_lossy())
632 .await
633 .map_err(opendal_error2errno)?;
634
635 Ok(())
636 }
637
638 async fn create(
639 &self,
640 _req: Request,
641 parent: &OsStr,
642 name: &OsStr,
643 mode: u32,
644 flags: u32,
645 ) -> Result<ReplyCreated> {
646 log::debug!(
647 "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})",
648 parent,
649 name,
650 mode,
651 flags
652 );
653
654 let (is_read, is_trunc, is_append) = self.check_flags(flags | libc::O_CREAT as u32)?;
655
656 let path = PathBuf::from(parent).join(name);
657
658 let inner_writer = if is_trunc || is_append {
659 let writer = self
660 .op
661 .writer_with(&path.to_string_lossy())
662 .chunk(4 * 1024 * 1024)
663 .append(is_append)
664 .await
665 .map_err(opendal_error2errno)?;
666 Some(Arc::new(Mutex::new(InnerWriter { writer, written: 0 })))
667 } else {
668 None
669 };
670
671 let now = SystemTime::now();
672 let attr = dummy_file_attr(FileType::RegularFile, now, self.uid, self.gid);
673
674 let key = self
675 .opened_files
676 .insert(OpenedFile {
677 path: path.into(),
678 is_read,
679 inner_writer,
680 })
681 .ok_or(Errno::from(libc::EBUSY))?;
682
683 Ok(ReplyCreated {
684 ttl: TTL,
685 attr,
686 generation: 0,
687 fh: FileKey(key).to_fh(),
688 flags,
689 })
690 }
691
692 type DirEntryPlusStream<'a> = BoxStream<'a, Result<DirectoryEntryPlus>>;
693
694 async fn readdirplus<'a>(
695 &'a self,
696 _req: Request,
697 parent: &'a OsStr,
698 fh: u64,
699 offset: u64,
700 _lock_owner: u64,
701 ) -> Result<ReplyDirectoryPlus<Self::DirEntryPlusStream<'a>>> {
702 log::debug!(
703 "readdirplus(parent={:?}, fh={}, offset={})",
704 parent,
705 fh,
706 offset
707 );
708
709 let now = SystemTime::now();
710 let mut current_dir = PathBuf::from(parent);
711 current_dir.push(""); let uid = self.uid;
713 let gid = self.gid;
714
715 let path = current_dir.to_string_lossy().to_string();
716 let children = self
717 .op
718 .lister_with(&path)
719 .await
720 .map_err(opendal_error2errno)?
721 .filter_map(move |entry| {
722 let dir = normalize_path(path.as_str());
723 async move {
724 match entry {
725 Ok(e) if e.path() == dir => None,
726 _ => Some(entry),
727 }
728 }
729 })
730 .enumerate()
731 .map(move |(i, entry)| {
732 entry
733 .map(|e| {
734 let metadata = e.metadata();
735 DirectoryEntryPlus {
736 kind: entry_mode2file_type(metadata.mode()),
737 name: e.name().trim_matches('/').into(),
738 offset: (i + 3) as i64,
739 attr: metadata2file_attr(metadata, now, uid, gid),
740 entry_ttl: TTL,
741 attr_ttl: TTL,
742 }
743 })
744 .map_err(opendal_error2errno)
745 });
746
747 let relative_path_attr = dummy_file_attr(FileType::Directory, now, uid, gid);
748 let relative_paths = stream::iter([
749 Result::Ok(DirectoryEntryPlus {
750 kind: FileType::Directory,
751 name: ".".into(),
752 offset: 1,
753 attr: relative_path_attr,
754 entry_ttl: TTL,
755 attr_ttl: TTL,
756 }),
757 Result::Ok(DirectoryEntryPlus {
758 kind: FileType::Directory,
759 name: "..".into(),
760 offset: 2,
761 attr: relative_path_attr,
762 entry_ttl: TTL,
763 attr_ttl: TTL,
764 }),
765 ]);
766
767 Ok(ReplyDirectoryPlus {
768 entries: relative_paths.chain(children).skip(offset as usize).boxed(),
769 })
770 }
771
772 async fn rename2(
773 &self,
774 req: Request,
775 origin_parent: &OsStr,
776 origin_name: &OsStr,
777 parent: &OsStr,
778 name: &OsStr,
779 _flags: u32,
780 ) -> Result<()> {
781 log::debug!(
782 "rename2(origin_parent={:?}, origin_name={:?}, parent={:?}, name={:?})",
783 origin_parent,
784 origin_name,
785 parent,
786 name
787 );
788 self.rename(req, origin_parent, origin_name, parent, name)
789 .await
790 }
791
792 async fn copy_file_range(
793 &self,
794 req: Request,
795 from_path: Option<&OsStr>,
796 fh_in: u64,
797 offset_in: u64,
798 to_path: Option<&OsStr>,
799 fh_out: u64,
800 offset_out: u64,
801 length: u64,
802 flags: u64,
803 ) -> Result<ReplyCopyFileRange> {
804 log::debug!(
805 "copy_file_range(from_path={:?}, fh_in={}, offset_in={}, to_path={:?}, fh_out={}, offset_out={}, length={}, flags={})",
806 from_path,
807 fh_in,
808 offset_in,
809 to_path,
810 fh_out,
811 offset_out,
812 length,
813 flags
814 );
815 let data = self
816 .read(req, from_path, fh_in, offset_in, length as _)
817 .await?;
818
819 let ReplyWrite { written } = self
820 .write(req, to_path, fh_out, offset_out, &data.data, 0, flags as _)
821 .await?;
822
823 Ok(ReplyCopyFileRange {
824 copied: u64::from(written),
825 })
826 }
827
828 async fn statfs(&self, _req: Request, path: &OsStr) -> Result<ReplyStatFs> {
829 log::debug!("statfs(path={:?})", path);
830 Ok(ReplyStatFs {
831 blocks: 1,
832 bfree: 0,
833 bavail: 0,
834 files: 1,
835 ffree: 0,
836 bsize: 4096,
837 namelen: u32::MAX,
838 frsize: 0,
839 })
840 }
841}
842
843const fn entry_mode2file_type(mode: EntryMode) -> FileType {
844 match mode {
845 EntryMode::DIR => FileType::Directory,
846 _ => FileType::RegularFile,
847 }
848}
849
850fn metadata2file_attr(metadata: &Metadata, atime: SystemTime, uid: u32, gid: u32) -> FileAttr {
851 let last_modified = metadata.last_modified().map(|t| t.into()).unwrap_or(atime);
852 let kind = entry_mode2file_type(metadata.mode());
853 FileAttr {
854 size: metadata.content_length(),
855 mtime: last_modified,
856 ctime: last_modified,
857 ..dummy_file_attr(kind, atime, uid, gid)
858 }
859}
860
861const fn dummy_file_attr(kind: FileType, now: SystemTime, uid: u32, gid: u32) -> FileAttr {
862 FileAttr {
863 size: 0,
864 blocks: 0,
865 atime: now,
866 mtime: now,
867 ctime: now,
868 kind,
869 perm: fuse3::perm_from_mode_and_kind(kind, 0o775),
870 nlink: 0,
871 uid,
872 gid,
873 rdev: 0,
874 blksize: 4096,
875 #[cfg(target_os = "macos")]
876 crtime: now,
877 #[cfg(target_os = "macos")]
878 flags: 0,
879 }
880}
881
882fn opendal_error2errno(err: opendal::Error) -> fuse3::Errno {
883 log::trace!("opendal_error2errno: {:?}", err);
884 match err.kind() {
885 ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
886 ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
887 ErrorKind::NotFound => Errno::from(libc::ENOENT),
888 ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
889 ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
890 ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
891 ErrorKind::RangeNotSatisfied => Errno::from(libc::EINVAL),
892 ErrorKind::RateLimited => Errno::from(libc::EBUSY),
893 _ => Errno::from(libc::ENOENT),
894 }
895}