use std::io::Cursor;
use std::sync::Arc;
use compio::dispatcher::Dispatcher;
use compio::fs::OpenOptions;
use super::core::CompfsCore;
use super::lister::CompfsLister;
use super::reader::CompfsReader;
use super::writer::CompfsWriter;
use crate::raw::*;
use crate::services::CompfsConfig;
use crate::*;
impl Configurator for CompfsConfig {
type Builder = CompfsBuilder;
fn into_builder(self) -> Self::Builder {
CompfsBuilder { config: self }
}
}
#[derive(Debug, Clone, Default)]
pub struct CompfsBuilder {
config: CompfsConfig,
}
impl CompfsBuilder {
pub fn root(mut self, root: &str) -> Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};
self
}
}
impl Builder for CompfsBuilder {
const SCHEME: Scheme = Scheme::Compfs;
type Config = CompfsConfig;
fn build(self) -> Result<impl Access> {
let root = match self.config.root {
Some(root) => Ok(root),
None => Err(Error::new(
ErrorKind::ConfigInvalid,
"root is not specified",
)),
}?;
if let Err(e) = std::fs::metadata(&root) {
if e.kind() == std::io::ErrorKind::NotFound {
std::fs::create_dir_all(&root).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create root dir failed")
.with_operation("Builder::build")
.with_context("root", root.as_str())
.set_source(e)
})?;
}
}
let dispatcher = Dispatcher::new().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"failed to initiate compio dispatcher",
)
})?;
let core = CompfsCore {
root: root.into(),
dispatcher,
buf_pool: oio::PooledBuf::new(16),
};
Ok(CompfsBackend {
core: Arc::new(core),
})
}
}
#[derive(Debug)]
pub struct CompfsBackend {
core: Arc<CompfsCore>,
}
impl Access for CompfsBackend {
type Reader = CompfsReader;
type Writer = CompfsWriter;
type Lister = Option<CompfsLister>;
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
fn info(&self) -> Arc<AccessorInfo> {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Compfs)
.set_root(&self.core.root.to_string_lossy())
.set_native_capability(Capability {
stat: true,
read: true,
write: true,
write_can_empty: true,
write_can_multi: true,
create_dir: true,
delete: true,
list: true,
copy: true,
rename: true,
shared: true,
..Default::default()
});
am.into()
}
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let path = self.core.prepare_path(path);
self.core
.exec(move || async move { compio::fs::create_dir_all(path).await })
.await?;
Ok(RpCreateDir::default())
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let path = self.core.prepare_path(path);
let meta = self
.core
.exec(move || async move { compio::fs::metadata(path).await })
.await?;
let ty = meta.file_type();
let mode = if ty.is_dir() {
EntryMode::DIR
} else if ty.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let last_mod = meta.modified().map_err(new_std_io_error)?.into();
let ret = Metadata::new(mode).with_last_modified(last_mod);
Ok(RpStat::new(ret))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
if path.ends_with('/') {
let path = self.core.prepare_path(path);
self.core
.exec(move || async move { compio::fs::remove_dir(path).await })
.await?;
} else {
let path = self.core.prepare_path(path);
self.core
.exec(move || async move { compio::fs::remove_file(path).await })
.await?;
}
Ok(RpDelete::default())
}
async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
let from = self.core.prepare_path(from);
let to = self.core.prepare_path(to);
self.core
.exec(move || async move {
let from = OpenOptions::new().read(true).open(from).await?;
let to = OpenOptions::new().write(true).create(true).open(to).await?;
let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
compio::io::copy(&mut from, &mut to).await?;
Ok(())
})
.await?;
Ok(RpCopy::default())
}
async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
let from = self.core.prepare_path(from);
let to = self.core.prepare_path(to);
self.core
.exec(move || async move { compio::fs::rename(from, to).await })
.await?;
Ok(RpRename::default())
}
async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
let path = self.core.prepare_path(path);
let file = self
.core
.exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
.await?;
let r = CompfsReader::new(self.core.clone(), file, op.range());
Ok((RpRead::new(), r))
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path = self.core.prepare_path(path);
let append = args.append();
let file = self
.core
.exec(move || async move {
compio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(!append)
.open(path)
.await
})
.await
.map(Cursor::new)?;
let w = CompfsWriter::new(self.core.clone(), file);
Ok((RpWrite::new(), w))
}
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let path = self.core.prepare_path(path);
let read_dir = match self
.core
.exec_blocking(move || std::fs::read_dir(path))
.await?
{
Ok(rd) => rd,
Err(e) => {
return if e.kind() == std::io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
};
}
};
let lister = CompfsLister::new(self.core.clone(), read_dir);
Ok((RpList::default(), Some(lister)))
}
}