use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::DateTime;
use log::debug;
use super::core::*;
use super::lister::FsLister;
use super::reader::FsReader;
use super::writer::FsWriter;
use crate::raw::*;
use crate::*;
#[doc = include_str!("docs.md")]
#[derive(Default, Debug)]
pub struct FsBuilder {
root: Option<PathBuf>,
atomic_write_dir: Option<PathBuf>,
}
impl FsBuilder {
pub fn root(&mut self, root: &str) -> &mut Self {
self.root = if root.is_empty() {
None
} else {
Some(PathBuf::from(root))
};
self
}
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.atomic_write_dir = if dir.is_empty() {
None
} else {
Some(PathBuf::from(dir))
};
self
}
}
impl Builder for FsBuilder {
const SCHEME: Scheme = Scheme::Fs;
type Accessor = FsBackend;
fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = FsBuilder::default();
map.get("root").map(|v| builder.root(v));
map.get("atomic_write_dir")
.map(|v| builder.atomic_write_dir(v));
builder
}
fn build(&mut self) -> Result<Self::Accessor> {
debug!("backend build started: {:?}", &self);
let root = match self.root.take() {
Some(root) => Ok(root),
None => Err(Error::new(
ErrorKind::ConfigInvalid,
"root is not specified",
)),
}?;
debug!("backend use root {}", root.to_string_lossy());
if let Err(e) = std::fs::metadata(&root) {
if e.kind() == std::io::ErrorKind::NotFound {
std::fs::create_dir_all(&root).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create root dir failed")
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})?;
}
}
let atomic_write_dir = self.atomic_write_dir.take();
if let Some(d) = &atomic_write_dir {
if let Err(e) = std::fs::metadata(d) {
if e.kind() == std::io::ErrorKind::NotFound {
std::fs::create_dir_all(d).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
.with_operation("Builder::build")
.with_context("atomic_write_dir", d.to_string_lossy())
.set_source(e)
})?;
}
}
}
let root = root.canonicalize().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"canonicalize of root directory failed",
)
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})?;
let atomic_write_dir = atomic_write_dir
.map(|p| {
p.canonicalize().map(Some).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"canonicalize of atomic_write_dir directory failed",
)
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})
})
.unwrap_or(Ok(None))?;
debug!("backend build finished: {:?}", &self);
Ok(FsBackend {
core: Arc::new(FsCore {
root,
atomic_write_dir,
buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
}),
})
}
}
#[derive(Debug, Clone)]
pub struct FsBackend {
core: Arc<FsCore>,
}
impl Access for FsBackend {
type Reader = FsReader;
type Writer = FsWriter<tokio::fs::File>;
type Lister = Option<FsLister<tokio::fs::ReadDir>>;
type BlockingReader = FsReader;
type BlockingWriter = FsWriter<std::fs::File>;
type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Fs)
.set_root(&self.core.root.to_string_lossy())
.set_native_capability(Capability {
stat: true,
read: true,
write: true,
write_can_empty: true,
write_can_append: true,
write_can_multi: true,
create_dir: true,
delete: true,
list: true,
copy: true,
rename: true,
blocking: true,
..Default::default()
});
am
}
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = self.core.root.join(path.trim_end_matches('/'));
tokio::fs::create_dir_all(&p)
.await
.map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = self.core.root.join(path.trim_end_matches('/'));
let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let m = Metadata::new(mode)
.with_content_length(meta.len())
.with_last_modified(
meta.modified()
.map(DateTime::from)
.map_err(new_std_io_error)?,
);
Ok(RpStat::new(m))
}
async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
let p = self.core.root.join(path.trim_end_matches('/'));
let f = tokio::fs::OpenOptions::new()
.read(true)
.open(&p)
.await
.map_err(new_std_io_error)?;
let r = FsReader::new(self.core.clone(), f.into_std().await);
Ok((RpRead::new(), r))
}
async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
let target_path = self
.core
.ensure_write_abs_path(&self.core.root, path)
.await?;
let tmp_path = self
.core
.ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
.await?;
if op.append()
&& tokio::fs::try_exists(&target_path)
.await
.map_err(new_std_io_error)?
{
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = self
.core
.ensure_write_abs_path(&self.core.root, path)
.await?;
(p, None)
};
let mut open_options = tokio::fs::OpenOptions::new();
open_options.create(true).write(true);
if op.append() {
open_options.append(true);
} else {
open_options.truncate(true);
}
let f = open_options
.open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let p = self.core.root.join(path.trim_end_matches('/'));
let meta = tokio::fs::metadata(&p).await;
match meta {
Ok(meta) => {
if meta.is_dir() {
tokio::fs::remove_dir(&p).await.map_err(new_std_io_error)?;
} else {
tokio::fs::remove_file(&p).await.map_err(new_std_io_error)?;
}
Ok(RpDelete::default())
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(RpDelete::default()),
Err(err) => Err(new_std_io_error(err)),
}
}
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let p = self.core.root.join(path.trim_end_matches('/'));
let f = match tokio::fs::read_dir(&p).await {
Ok(rd) => rd,
Err(e) => {
return if e.kind() == std::io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
};
}
};
let rd = FsLister::new(&self.core.root, f);
Ok((RpList::default(), Some(rd)))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from = self.core.root.join(from.trim_end_matches('/'));
tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
let to = self
.core
.ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
.await?;
tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
Ok(RpCopy::default())
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from = self.core.root.join(from.trim_end_matches('/'));
tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
let to = self
.core
.ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
.await?;
tokio::fs::rename(from, to)
.await
.map_err(new_std_io_error)?;
Ok(RpRename::default())
}
fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = self.core.root.join(path.trim_end_matches('/'));
std::fs::create_dir_all(p).map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}
fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = self.core.root.join(path.trim_end_matches('/'));
let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let m = Metadata::new(mode)
.with_content_length(meta.len())
.with_last_modified(
meta.modified()
.map(DateTime::from)
.map_err(new_std_io_error)?,
);
Ok(RpStat::new(m))
}
fn blocking_read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let p = self.core.root.join(path.trim_end_matches('/'));
let f = std::fs::OpenOptions::new()
.read(true)
.open(p)
.map_err(new_std_io_error)?;
let r = FsReader::new(self.core.clone(), f);
Ok((RpRead::new(), r))
}
fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
let target_path = self
.core
.blocking_ensure_write_abs_path(&self.core.root, path)?;
let tmp_path = self
.core
.blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
if op.append()
&& Path::new(&target_path)
.try_exists()
.map_err(new_std_io_error)?
{
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = self
.core
.blocking_ensure_write_abs_path(&self.core.root, path)?;
(p, None)
};
let mut f = std::fs::OpenOptions::new();
f.create(true).write(true);
if op.append() {
f.append(true);
} else {
f.truncate(true);
}
let f = f
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(new_std_io_error)?;
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}
fn blocking_delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let p = self.core.root.join(path.trim_end_matches('/'));
let meta = std::fs::metadata(&p);
match meta {
Ok(meta) => {
if meta.is_dir() {
std::fs::remove_dir(&p).map_err(new_std_io_error)?;
} else {
std::fs::remove_file(&p).map_err(new_std_io_error)?;
}
Ok(RpDelete::default())
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(RpDelete::default()),
Err(err) => Err(new_std_io_error(err)),
}
}
fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
let p = self.core.root.join(path.trim_end_matches('/'));
let f = match std::fs::read_dir(p) {
Ok(rd) => rd,
Err(e) => {
return if e.kind() == std::io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
};
}
};
let rd = FsLister::new(&self.core.root, f);
Ok((RpList::default(), Some(rd)))
}
fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from = self.core.root.join(from.trim_end_matches('/'));
std::fs::metadata(&from).map_err(new_std_io_error)?;
let to = self
.core
.blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
std::fs::copy(from, to).map_err(new_std_io_error)?;
Ok(RpCopy::default())
}
fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from = self.core.root.join(from.trim_end_matches('/'));
std::fs::metadata(&from).map_err(new_std_io_error)?;
let to = self
.core
.blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
std::fs::rename(from, to).map_err(new_std_io_error)?;
Ok(RpRename::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tmp_file_of() {
let cases = vec![
("hello.txt", "hello.txt"),
("/tmp/opendal.log", "opendal.log"),
("/abc/def/hello.parquet", "hello.parquet"),
];
for (path, expected_prefix) in cases {
let tmp_file = tmp_file_of(path);
assert!(tmp_file.len() > expected_prefix.len());
assert!(tmp_file.starts_with(expected_prefix));
}
}
}