1use std::ffi::OsStr;
19use std::num::NonZeroU32;
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::time::Duration;
23use std::time::SystemTime;
24
25use bytes::Bytes;
26use fuse3::path::prelude::*;
27use fuse3::Errno;
28use fuse3::Result;
29use futures_util::stream;
30use futures_util::stream::BoxStream;
31use futures_util::StreamExt;
32use opendal::raw::normalize_path;
33use opendal::EntryMode;
34use opendal::ErrorKind;
35use opendal::Metadata;
36use opendal::Operator;
37use sharded_slab::Slab;
38use tokio::sync::Mutex;
39
40use super::file::FileKey;
41use super::file::InnerWriter;
42use super::file::OpenedFile;
43
44const TTL: Duration = Duration::from_secs(1); pub struct Filesystem {
86 op: Operator,
87 gid: u32,
88 uid: u32,
89
90 opened_files: Slab<OpenedFile>,
91}
92
93impl Filesystem {
94 pub fn new(op: Operator, uid: u32, gid: u32) -> Self {
96 Self {
97 op,
98 uid,
99 gid,
100 opened_files: Slab::new(),
101 }
102 }
103
104 fn check_flags(&self, flags: u32) -> Result<(bool, bool, bool)> {
105 let is_trunc = flags & libc::O_TRUNC as u32 != 0 || flags & libc::O_CREAT as u32 != 0;
106 let is_append = flags & libc::O_APPEND as u32 != 0;
107
108 let mode = flags & libc::O_ACCMODE as u32;
109 let is_read = mode == libc::O_RDONLY as u32 || mode == libc::O_RDWR as u32;
110 let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32 || is_append;
111 if !is_read && !is_write {
112 Err(Errno::from(libc::EINVAL))?;
113 }
114 if (is_write && !is_trunc && !is_append) || is_trunc && !is_write {
117 Err(Errno::from(libc::EINVAL))?;
118 }
119
120 let capability = self.op.info().full_capability();
121 if is_read && !capability.read {
122 Err(Errno::from(libc::EACCES))?;
123 }
124 if is_trunc && !capability.write {
125 Err(Errno::from(libc::EACCES))?;
126 }
127 if is_append && !capability.write_can_append {
128 Err(Errno::from(libc::EACCES))?;
129 }
130
131 log::trace!(
132 "check_flags: is_read={is_read}, is_write={is_write}, is_trunc={is_trunc}, is_append={is_append}"
133 );
134 Ok((is_read, is_trunc, is_append))
135 }
136
137 fn get_opened_file(
139 &self,
140 key: FileKey,
141 path: Option<&OsStr>,
142 ) -> Result<sharded_slab::Entry<OpenedFile>> {
143 let file = self
144 .opened_files
145 .get(key.0)
146 .ok_or(Errno::from(libc::ENOENT))?;
147
148 if matches!(path, Some(path) if path != file.path) {
149 log::trace!(
150 "get_opened_file: path not match: path={:?}, file={:?}",
151 path,
152 file.path
153 );
154 Err(Errno::from(libc::EBADF))?;
155 }
156
157 Ok(file)
158 }
159}
160
161impl PathFilesystem for Filesystem {
162 async fn init(&self, _req: Request) -> Result<ReplyInit> {
164 Ok(ReplyInit {
165 max_write: NonZeroU32::new(16 * 1024).unwrap(),
166 })
167 }
168
169 async fn destroy(&self, _req: Request) {}
171
172 async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<ReplyEntry> {
173 log::debug!("lookup(parent={parent:?}, name={name:?})");
174
175 let path = PathBuf::from(parent).join(name);
176 let metadata = self
177 .op
178 .stat(&path.to_string_lossy())
179 .await
180 .map_err(opendal_error2errno)?;
181
182 let now = SystemTime::now();
183 let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
184
185 Ok(ReplyEntry { ttl: TTL, attr })
186 }
187
188 async fn getattr(
189 &self,
190 _req: Request,
191 path: Option<&OsStr>,
192 fh: Option<u64>,
193 flags: u32,
194 ) -> Result<ReplyAttr> {
195 log::debug!("getattr(path={path:?}, fh={fh:?}, flags={flags:?})");
196
197 let fh_path = fh.and_then(|fh| {
198 self.opened_files
199 .get(FileKey::try_from(fh).ok()?.0)
200 .map(|f| f.path.clone())
201 });
202
203 let file_path = match (path.map(Into::into), fh_path) {
204 (Some(a), Some(b)) => {
205 if a != b {
206 Err(Errno::from(libc::EBADF))?;
207 }
208 Some(a)
209 }
210 (a, b) => a.or(b),
211 };
212
213 let metadata = self
214 .op
215 .stat(&file_path.unwrap_or_default().to_string_lossy())
216 .await
217 .map_err(opendal_error2errno)?;
218
219 let now = SystemTime::now();
220 let attr = metadata2file_attr(&metadata, now, self.uid, self.gid);
221
222 Ok(ReplyAttr { ttl: TTL, attr })
223 }
224
225 async fn setattr(
226 &self,
227 _req: Request,
228 path: Option<&OsStr>,
229 fh: Option<u64>,
230 set_attr: SetAttr,
231 ) -> Result<ReplyAttr> {
232 log::debug!("setattr(path={path:?}, fh={fh:?}, set_attr={set_attr:?})");
233
234 self.getattr(_req, path, fh, 0).await
235 }
236
237 async fn symlink(
238 &self,
239 _req: Request,
240 parent: &OsStr,
241 name: &OsStr,
242 link_path: &OsStr,
243 ) -> Result<ReplyEntry> {
244 log::debug!("symlink(parent={parent:?}, name={name:?}, link_path={link_path:?})");
245 Err(libc::EOPNOTSUPP.into())
246 }
247
248 async fn mknod(
249 &self,
250 _req: Request,
251 parent: &OsStr,
252 name: &OsStr,
253 mode: u32,
254 _rdev: u32,
255 ) -> Result<ReplyEntry> {
256 log::debug!("mknod(parent={parent:?}, name={name:?}, mode=0o{mode:o})");
257 Err(libc::EOPNOTSUPP.into())
258 }
259
260 async fn mkdir(
261 &self,
262 _req: Request,
263 parent: &OsStr,
264 name: &OsStr,
265 mode: u32,
266 _umask: u32,
267 ) -> Result<ReplyEntry> {
268 log::debug!("mkdir(parent={parent:?}, name={name:?}, mode=0o{mode:o})");
269
270 let mut path = PathBuf::from(parent).join(name);
271 path.push(""); self.op
273 .create_dir(&path.to_string_lossy())
274 .await
275 .map_err(opendal_error2errno)?;
276
277 let now = SystemTime::now();
278 let attr = dummy_file_attr(FileType::Directory, now, self.uid, self.gid);
279
280 Ok(ReplyEntry { ttl: TTL, attr })
281 }
282
283 async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> {
284 log::debug!("unlink(parent={parent:?}, name={name:?})");
285
286 let path = PathBuf::from(parent).join(name);
287 self.op
288 .delete(&path.to_string_lossy())
289 .await
290 .map_err(opendal_error2errno)?;
291
292 Ok(())
293 }
294
295 async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> {
296 log::debug!("rmdir(parent={parent:?}, name={name:?})");
297
298 let path = PathBuf::from(parent).join(name);
299 self.op
300 .delete(&path.to_string_lossy())
301 .await
302 .map_err(opendal_error2errno)?;
303
304 Ok(())
305 }
306
307 async fn rename(
308 &self,
309 _req: Request,
310 origin_parent: &OsStr,
311 origin_name: &OsStr,
312 parent: &OsStr,
313 name: &OsStr,
314 ) -> Result<()> {
315 log::debug!(
316 "rename(p={origin_parent:?}, name={origin_name:?}, newp={parent:?}, newname={name:?})"
317 );
318
319 if !self.op.info().full_capability().rename {
320 return Err(Errno::from(libc::ENOTSUP))?;
321 }
322
323 let origin_path = PathBuf::from(origin_parent).join(origin_name);
324 let path = PathBuf::from(parent).join(name);
325
326 self.op
327 .rename(&origin_path.to_string_lossy(), &path.to_string_lossy())
328 .await
329 .map_err(opendal_error2errno)?;
330
331 Ok(())
332 }
333
334 async fn link(
335 &self,
336 _req: Request,
337 path: &OsStr,
338 new_parent: &OsStr,
339 new_name: &OsStr,
340 ) -> Result<ReplyEntry> {
341 log::debug!("link(path={path:?}, new_parent={new_parent:?}, new_name={new_name:?})");
342 Err(libc::EOPNOTSUPP.into())
343 }
344
345 async fn opendir(&self, _req: Request, path: &OsStr, flags: u32) -> Result<ReplyOpen> {
346 log::debug!("opendir(path={path:?}, flags=0x{flags:x})");
347 Ok(ReplyOpen { fh: 0, flags })
348 }
349
350 async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result<ReplyOpen> {
351 log::debug!("open(path={path:?}, flags=0x{flags:x})");
352
353 let (is_read, is_trunc, is_append) = self.check_flags(flags)?;
354 if flags & libc::O_CREAT as u32 != 0 {
355 self.op
356 .write(&path.to_string_lossy(), Bytes::new())
357 .await
358 .map_err(opendal_error2errno)?;
359 }
360
361 let inner_writer = if is_trunc || is_append {
362 let writer = self
363 .op
364 .writer_with(&path.to_string_lossy())
365 .append(is_append)
366 .await
367 .map_err(opendal_error2errno)?;
368 let written = if is_append {
369 self.op
370 .stat(&path.to_string_lossy())
371 .await
372 .map_err(opendal_error2errno)?
373 .content_length()
374 } else {
375 0
376 };
377 Some(Arc::new(Mutex::new(InnerWriter { writer, written })))
378 } else {
379 None
380 };
381
382 let key = self
383 .opened_files
384 .insert(OpenedFile {
385 path: path.into(),
386 is_read,
387 inner_writer,
388 })
389 .ok_or(Errno::from(libc::EBUSY))?;
390
391 Ok(ReplyOpen {
392 fh: FileKey(key).to_fh(),
393 flags,
394 })
395 }
396
397 async fn read(
398 &self,
399 _req: Request,
400 path: Option<&OsStr>,
401 fh: u64,
402 offset: u64,
403 size: u32,
404 ) -> Result<ReplyData> {
405 log::debug!("read(path={path:?}, fh={fh}, offset={offset}, size={size})");
406
407 let file_path = {
408 let file = self.get_opened_file(FileKey::try_from(fh)?, path)?;
409 if !file.is_read {
410 Err(Errno::from(libc::EACCES))?;
411 }
412 file.path.to_string_lossy().to_string()
413 };
414
415 let data = self
416 .op
417 .read_with(&file_path)
418 .range(offset..)
419 .await
420 .map_err(opendal_error2errno)?;
421
422 Ok(ReplyData {
423 data: data.to_bytes(),
424 })
425 }
426
427 async fn write(
428 &self,
429 _req: Request,
430 path: Option<&OsStr>,
431 fh: u64,
432 offset: u64,
433 data: &[u8],
434 _write_flags: u32,
435 flags: u32,
436 ) -> Result<ReplyWrite> {
437 log::debug!(
438 "write(path={:?}, fh={}, offset={}, data_len={}, flags=0x{:x})",
439 path,
440 fh,
441 offset,
442 data.len(),
443 flags
444 );
445
446 let Some(inner_writer) = ({
447 self.get_opened_file(FileKey::try_from(fh)?, path)?
448 .inner_writer
449 .clone()
450 }) else {
451 Err(Errno::from(libc::EACCES))?
452 };
453
454 let mut inner = inner_writer.lock().await;
455 if offset != inner.written {
457 Err(Errno::from(libc::EINVAL))?;
458 }
459
460 inner
461 .writer
462 .write_from(data)
463 .await
464 .map_err(opendal_error2errno)?;
465 inner.written += data.len() as u64;
466
467 Ok(ReplyWrite {
468 written: data.len() as _,
469 })
470 }
471
472 async fn release(
473 &self,
474 _req: Request,
475 path: Option<&OsStr>,
476 fh: u64,
477 flags: u32,
478 lock_owner: u64,
479 flush: bool,
480 ) -> Result<()> {
481 log::debug!(
482 "release(path={path:?}, fh={fh}, flags=0x{flags:x}, lock_owner={lock_owner}, flush={flush})"
483 );
484
485 let _ = self.opened_files.take(FileKey::try_from(fh)?.0);
487 Ok(())
488 }
489
490 async fn flush(
495 &self,
496 _req: Request,
497 path: Option<&OsStr>,
498 fh: u64,
499 lock_owner: u64,
500 ) -> Result<()> {
501 log::debug!("flush(path={path:?}, fh={fh}, lock_owner={lock_owner})");
502
503 let file = self
504 .opened_files
505 .take(FileKey::try_from(fh)?.0)
506 .ok_or(Errno::from(libc::EBADF))?;
507
508 if let Some(inner_writer) = file.inner_writer {
509 let mut lock = inner_writer.lock().await;
510 let res = lock.writer.close().await.map_err(opendal_error2errno);
511 return res.map(|_| ());
512 }
513
514 if matches!(path, Some(ref p) if p != &file.path) {
515 Err(Errno::from(libc::EBADF))?;
516 }
517
518 Ok(())
519 }
520
521 type DirEntryStream<'a> = BoxStream<'a, Result<DirectoryEntry>>;
522
523 async fn readdir<'a>(
524 &'a self,
525 _req: Request,
526 path: &'a OsStr,
527 fh: u64,
528 offset: i64,
529 ) -> Result<ReplyDirectory<Self::DirEntryStream<'a>>> {
530 log::debug!("readdir(path={path:?}, fh={fh}, offset={offset})");
531
532 let mut current_dir = PathBuf::from(path);
533 current_dir.push(""); let path = current_dir.to_string_lossy().to_string();
535 let children = self
536 .op
537 .lister(¤t_dir.to_string_lossy())
538 .await
539 .map_err(opendal_error2errno)?
540 .filter_map(move |entry| {
541 let dir = normalize_path(path.as_str());
542 async move {
543 match entry {
544 Ok(e) if e.path() == dir => None,
545 _ => Some(entry),
546 }
547 }
548 })
549 .enumerate()
550 .map(|(i, entry)| {
551 entry
552 .map(|e| DirectoryEntry {
553 kind: entry_mode2file_type(e.metadata().mode()),
554 name: e.name().trim_matches('/').into(),
555 offset: (i + 3) as i64,
556 })
557 .map_err(opendal_error2errno)
558 });
559
560 let relative_paths = stream::iter([
561 Result::Ok(DirectoryEntry {
562 kind: FileType::Directory,
563 name: ".".into(),
564 offset: 1,
565 }),
566 Result::Ok(DirectoryEntry {
567 kind: FileType::Directory,
568 name: "..".into(),
569 offset: 2,
570 }),
571 ]);
572
573 Ok(ReplyDirectory {
574 entries: relative_paths.chain(children).skip(offset as usize).boxed(),
575 })
576 }
577
578 async fn access(&self, _req: Request, path: &OsStr, mask: u32) -> Result<()> {
579 log::debug!("access(path={path:?}, mask=0x{mask:x})");
580
581 self.op
582 .stat(&path.to_string_lossy())
583 .await
584 .map_err(opendal_error2errno)?;
585
586 Ok(())
587 }
588
589 async fn create(
590 &self,
591 _req: Request,
592 parent: &OsStr,
593 name: &OsStr,
594 mode: u32,
595 flags: u32,
596 ) -> Result<ReplyCreated> {
597 log::debug!("create(parent={parent:?}, name={name:?}, mode=0o{mode:o}, flags=0x{flags:x})");
598
599 let (is_read, is_trunc, is_append) = self.check_flags(flags | libc::O_CREAT as u32)?;
600
601 let path = PathBuf::from(parent).join(name);
602
603 let inner_writer = if is_trunc || is_append {
604 let writer = self
605 .op
606 .writer_with(&path.to_string_lossy())
607 .chunk(4 * 1024 * 1024)
608 .append(is_append)
609 .await
610 .map_err(opendal_error2errno)?;
611 Some(Arc::new(Mutex::new(InnerWriter { writer, written: 0 })))
612 } else {
613 None
614 };
615
616 let now = SystemTime::now();
617 let attr = dummy_file_attr(FileType::RegularFile, now, self.uid, self.gid);
618
619 let key = self
620 .opened_files
621 .insert(OpenedFile {
622 path: path.into(),
623 is_read,
624 inner_writer,
625 })
626 .ok_or(Errno::from(libc::EBUSY))?;
627
628 Ok(ReplyCreated {
629 ttl: TTL,
630 attr,
631 generation: 0,
632 fh: FileKey(key).to_fh(),
633 flags,
634 })
635 }
636
637 type DirEntryPlusStream<'a> = BoxStream<'a, Result<DirectoryEntryPlus>>;
638
639 async fn readdirplus<'a>(
640 &'a self,
641 _req: Request,
642 parent: &'a OsStr,
643 fh: u64,
644 offset: u64,
645 _lock_owner: u64,
646 ) -> Result<ReplyDirectoryPlus<Self::DirEntryPlusStream<'a>>> {
647 log::debug!("readdirplus(parent={parent:?}, fh={fh}, offset={offset})");
648
649 let now = SystemTime::now();
650 let mut current_dir = PathBuf::from(parent);
651 current_dir.push(""); let uid = self.uid;
653 let gid = self.gid;
654
655 let path = current_dir.to_string_lossy().to_string();
656 let children = self
657 .op
658 .lister_with(&path)
659 .await
660 .map_err(opendal_error2errno)?
661 .filter_map(move |entry| {
662 let dir = normalize_path(path.as_str());
663 async move {
664 match entry {
665 Ok(e) if e.path() == dir => None,
666 _ => Some(entry),
667 }
668 }
669 })
670 .enumerate()
671 .map(move |(i, entry)| {
672 entry
673 .map(|e| {
674 let metadata = e.metadata();
675 DirectoryEntryPlus {
676 kind: entry_mode2file_type(metadata.mode()),
677 name: e.name().trim_matches('/').into(),
678 offset: (i + 3) as i64,
679 attr: metadata2file_attr(metadata, now, uid, gid),
680 entry_ttl: TTL,
681 attr_ttl: TTL,
682 }
683 })
684 .map_err(opendal_error2errno)
685 });
686
687 let relative_path_attr = dummy_file_attr(FileType::Directory, now, uid, gid);
688 let relative_paths = stream::iter([
689 Result::Ok(DirectoryEntryPlus {
690 kind: FileType::Directory,
691 name: ".".into(),
692 offset: 1,
693 attr: relative_path_attr,
694 entry_ttl: TTL,
695 attr_ttl: TTL,
696 }),
697 Result::Ok(DirectoryEntryPlus {
698 kind: FileType::Directory,
699 name: "..".into(),
700 offset: 2,
701 attr: relative_path_attr,
702 entry_ttl: TTL,
703 attr_ttl: TTL,
704 }),
705 ]);
706
707 Ok(ReplyDirectoryPlus {
708 entries: relative_paths.chain(children).skip(offset as usize).boxed(),
709 })
710 }
711
712 async fn rename2(
713 &self,
714 req: Request,
715 origin_parent: &OsStr,
716 origin_name: &OsStr,
717 parent: &OsStr,
718 name: &OsStr,
719 _flags: u32,
720 ) -> Result<()> {
721 log::debug!(
722 "rename2(origin_parent={origin_parent:?}, origin_name={origin_name:?}, parent={parent:?}, name={name:?})"
723 );
724 self.rename(req, origin_parent, origin_name, parent, name)
725 .await
726 }
727
728 async fn copy_file_range(
729 &self,
730 req: Request,
731 from_path: Option<&OsStr>,
732 fh_in: u64,
733 offset_in: u64,
734 to_path: Option<&OsStr>,
735 fh_out: u64,
736 offset_out: u64,
737 length: u64,
738 flags: u64,
739 ) -> Result<ReplyCopyFileRange> {
740 log::debug!(
741 "copy_file_range(from_path={from_path:?}, fh_in={fh_in}, offset_in={offset_in}, to_path={to_path:?}, fh_out={fh_out}, offset_out={offset_out}, length={length}, flags={flags})"
742 );
743 let data = self
744 .read(req, from_path, fh_in, offset_in, length as _)
745 .await?;
746
747 let ReplyWrite { written } = self
748 .write(req, to_path, fh_out, offset_out, &data.data, 0, flags as _)
749 .await?;
750
751 Ok(ReplyCopyFileRange {
752 copied: u64::from(written),
753 })
754 }
755
756 async fn statfs(&self, _req: Request, path: &OsStr) -> Result<ReplyStatFs> {
757 log::debug!("statfs(path={path:?})");
758 Ok(ReplyStatFs {
759 blocks: 1,
760 bfree: 0,
761 bavail: 0,
762 files: 1,
763 ffree: 0,
764 bsize: 4096,
765 namelen: u32::MAX,
766 frsize: 0,
767 })
768 }
769}
770
771const fn entry_mode2file_type(mode: EntryMode) -> FileType {
772 match mode {
773 EntryMode::DIR => FileType::Directory,
774 _ => FileType::RegularFile,
775 }
776}
777
778fn metadata2file_attr(metadata: &Metadata, atime: SystemTime, uid: u32, gid: u32) -> FileAttr {
779 let last_modified = metadata.last_modified().map(|t| t.into()).unwrap_or(atime);
780 let kind = entry_mode2file_type(metadata.mode());
781 FileAttr {
782 size: metadata.content_length(),
783 mtime: last_modified,
784 ctime: last_modified,
785 ..dummy_file_attr(kind, atime, uid, gid)
786 }
787}
788
789const fn dummy_file_attr(kind: FileType, now: SystemTime, uid: u32, gid: u32) -> FileAttr {
790 FileAttr {
791 size: 0,
792 blocks: 0,
793 atime: now,
794 mtime: now,
795 ctime: now,
796 kind,
797 perm: fuse3::perm_from_mode_and_kind(kind, 0o775),
798 nlink: 0,
799 uid,
800 gid,
801 rdev: 0,
802 blksize: 4096,
803 #[cfg(target_os = "macos")]
804 crtime: now,
805 #[cfg(target_os = "macos")]
806 flags: 0,
807 }
808}
809
810fn opendal_error2errno(err: opendal::Error) -> fuse3::Errno {
811 log::trace!("opendal_error2errno: {err:?}");
812 match err.kind() {
813 ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
814 ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
815 ErrorKind::NotFound => Errno::from(libc::ENOENT),
816 ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
817 ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
818 ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
819 ErrorKind::RangeNotSatisfied => Errno::from(libc::EINVAL),
820 ErrorKind::RateLimited => Errno::from(libc::EBUSY),
821 _ => Errno::from(libc::ENOENT),
822 }
823}