use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use futures::AsyncWriteExt;
use log::debug;
use serde::Deserialize;
use uuid::Uuid;
use super::lister::HdfsLister;
use super::writer::HdfsWriter;
use crate::raw::*;
use crate::services::hdfs::reader::HdfsReader;
use crate::*;
#[derive(Default, Deserialize, Clone)]
#[serde(default)]
#[non_exhaustive]
pub struct HdfsConfig {
pub root: Option<String>,
pub name_node: Option<String>,
pub kerberos_ticket_cache_path: Option<String>,
pub user: Option<String>,
pub enable_append: bool,
pub atomic_write_dir: Option<String>,
}
impl Debug for HdfsConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsConfig")
.field("root", &self.root)
.field("name_node", &self.name_node)
.field(
"kerberos_ticket_cache_path",
&self.kerberos_ticket_cache_path,
)
.field("user", &self.user)
.field("enable_append", &self.enable_append)
.field("atomic_write_dir", &self.atomic_write_dir)
.finish_non_exhaustive()
}
}
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct HdfsBuilder {
config: HdfsConfig,
}
impl Debug for HdfsBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsBuilder")
.field("config", &self.config)
.finish()
}
}
impl HdfsBuilder {
pub fn root(&mut self, root: &str) -> &mut Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};
self
}
pub fn name_node(&mut self, name_node: &str) -> &mut Self {
if !name_node.is_empty() {
self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
}
self
}
pub fn kerberos_ticket_cache_path(&mut self, kerberos_ticket_cache_path: &str) -> &mut Self {
if !kerberos_ticket_cache_path.is_empty() {
self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
}
self
}
pub fn user(&mut self, user: &str) -> &mut Self {
if !user.is_empty() {
self.config.user = Some(user.to_string())
}
self
}
pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
self.config.enable_append = enable_append;
self
}
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.config.atomic_write_dir = if dir.is_empty() {
None
} else {
Some(String::from(dir))
};
self
}
}
impl Builder for HdfsBuilder {
const SCHEME: Scheme = Scheme::Hdfs;
type Accessor = HdfsBackend;
fn from_map(map: HashMap<String, String>) -> Self {
let config = HdfsConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");
HdfsBuilder { config }
}
fn build(&mut self) -> Result<Self::Accessor> {
debug!("backend build started: {:?}", &self);
let name_node = match &self.config.name_node {
Some(v) => v,
None => {
return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
.with_context("service", Scheme::Hdfs))
}
};
let root = normalize_root(&self.config.root.take().unwrap_or_default());
debug!("backend use root {}", root);
let mut builder = hdrs::ClientBuilder::new(name_node);
if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
}
if let Some(user) = &self.config.user {
builder = builder.with_user(user.as_str());
}
let client = builder.connect().map_err(new_std_io_error)?;
if let Err(e) = client.metadata(&root) {
if e.kind() == io::ErrorKind::NotFound {
debug!("root {} is not exist, creating now", root);
client.create_dir(&root).map_err(new_std_io_error)?
}
}
let atomic_write_dir = self.config.atomic_write_dir.take();
if let Some(d) = &atomic_write_dir {
if let Err(e) = client.metadata(d) {
if e.kind() == io::ErrorKind::NotFound {
client.create_dir(d).map_err(new_std_io_error)?
}
}
}
debug!("backend build finished: {:?}", &self);
Ok(HdfsBackend {
root,
atomic_write_dir,
client: Arc::new(client),
enable_append: self.config.enable_append,
})
}
}
#[inline]
fn tmp_file_of(path: &str) -> String {
let name = get_basename(path);
let uuid = Uuid::new_v4().to_string();
format!("{name}.{uuid}")
}
#[derive(Debug, Clone)]
pub struct HdfsBackend {
root: String,
atomic_write_dir: Option<String>,
client: Arc<hdrs::Client>,
enable_append: bool,
}
unsafe impl Send for HdfsBackend {}
unsafe impl Sync for HdfsBackend {}
impl Access for HdfsBackend {
type Reader = HdfsReader;
type Writer = HdfsWriter<hdrs::AsyncFile>;
type Lister = Option<HdfsLister>;
type BlockingReader = HdfsReader;
type BlockingWriter = HdfsWriter<hdrs::File>;
type BlockingLister = Option<HdfsLister>;
fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Hdfs)
.set_root(&self.root)
.set_native_capability(Capability {
stat: true,
read: true,
write: true,
write_can_append: self.enable_append,
create_dir: true,
delete: true,
list: true,
rename: true,
blocking: true,
..Default::default()
});
am
}
async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);
self.client.create_dir(&p).map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);
let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let mut m = Metadata::new(mode);
m.set_content_length(meta.len());
m.set_last_modified(meta.modified().into());
Ok(RpStat::new(m))
}
async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
let p = build_rooted_abs_path(&self.root, path);
let client = self.client.clone();
let f = match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime
.spawn_blocking(move || {
client
.open_file()
.read(true)
.open(&p)
.map_err(new_std_io_error)
})
.await
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "tokio spawn io task failed").set_source(err)
})?,
Err(_) => Err(Error::new(
ErrorKind::Unexpected,
"no tokio runtime found, failed to run io task",
)),
}?;
Ok((RpRead::new(), HdfsReader::new(f)))
}
async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = build_rooted_abs_path(&self.root, path);
let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path));
if op.append() && self.client.metadata(&target_path).is_ok() {
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = build_rooted_abs_path(&self.root, path);
(p, None)
};
if let Err(err) = self.client.metadata(&target_path) {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
let mut f = self
.client
.open_file()
.create(true)
.write(true)
.async_open(&target_path)
.await
.map_err(new_std_io_error)?;
f.close().await.map_err(new_std_io_error)?;
}
let mut open_options = self.client.open_file();
open_options.create(true);
if op.append() {
open_options.append(true);
} else {
open_options.write(true);
}
let f = open_options
.async_open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;
Ok((
RpWrite::new(),
HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)),
))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let p = build_rooted_abs_path(&self.root, path);
let meta = self.client.metadata(&p);
if let Err(err) = meta {
return if err.kind() == io::ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(new_std_io_error(err))
};
}
let meta = meta.ok().unwrap();
let result = if meta.is_dir() {
self.client.remove_dir(&p)
} else {
self.client.remove_file(&p)
};
result.map_err(new_std_io_error)?;
Ok(RpDelete::default())
}
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);
let f = match self.client.read_dir(&p) {
Ok(f) => f,
Err(e) => {
return if e.kind() == io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
}
}
};
let rd = HdfsLister::new(&self.root, f);
Ok((RpList::default(), Some(rd)))
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
self.client.metadata(&from_path).map_err(new_std_io_error)?;
let to_path = build_rooted_abs_path(&self.root, to);
let result = self.client.metadata(&to_path);
match result {
Err(err) => {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
let parent = PathBuf::from(&to_path)
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", &to_path)
})?
.to_path_buf();
self.client
.create_dir(&parent.to_string_lossy())
.map_err(new_std_io_error)?;
}
Ok(metadata) => {
if metadata.is_file() {
self.client
.remove_file(&to_path)
.map_err(new_std_io_error)?;
} else {
return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
.with_context("input", &to_path));
}
}
}
self.client
.rename_file(&from_path, &to_path)
.map_err(new_std_io_error)?;
Ok(RpRename::new())
}
fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);
self.client.create_dir(&p).map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}
fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);
let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let mut m = Metadata::new(mode);
m.set_content_length(meta.len());
m.set_last_modified(meta.modified().into());
Ok(RpStat::new(m))
}
fn blocking_read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let p = build_rooted_abs_path(&self.root, path);
let f = self
.client
.open_file()
.read(true)
.open(&p)
.map_err(new_std_io_error)?;
Ok((RpRead::new(), HdfsReader::new(f)))
}
fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = build_rooted_abs_path(&self.root, path);
let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path));
if op.append() && self.client.metadata(&target_path).is_ok() {
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = build_rooted_abs_path(&self.root, path);
(p, None)
};
if let Err(err) = self.client.metadata(&target_path) {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
self.client
.open_file()
.create(true)
.write(true)
.open(&target_path)
.map_err(new_std_io_error)?;
}
let mut open_options = self.client.open_file();
open_options.create(true);
if op.append() {
open_options.append(true);
} else {
open_options.write(true);
}
let f = open_options
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(new_std_io_error)?;
Ok((
RpWrite::new(),
HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)),
))
}
fn blocking_delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let p = build_rooted_abs_path(&self.root, path);
let meta = self.client.metadata(&p);
if let Err(err) = meta {
return if err.kind() == io::ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(new_std_io_error(err))
};
}
let meta = meta.ok().unwrap();
let result = if meta.is_dir() {
self.client.remove_dir(&p)
} else {
self.client.remove_file(&p)
};
result.map_err(new_std_io_error)?;
Ok(RpDelete::default())
}
fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
let p = build_rooted_abs_path(&self.root, path);
let f = match self.client.read_dir(&p) {
Ok(f) => f,
Err(e) => {
return if e.kind() == io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
}
}
};
let rd = HdfsLister::new(&self.root, f);
Ok((RpList::default(), Some(rd)))
}
fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
self.client.metadata(&from_path).map_err(new_std_io_error)?;
let to_path = build_rooted_abs_path(&self.root, to);
let result = self.client.metadata(&to_path);
match result {
Err(err) => {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
let parent = PathBuf::from(&to_path)
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", &to_path)
})?
.to_path_buf();
self.client
.create_dir(&parent.to_string_lossy())
.map_err(new_std_io_error)?;
}
Ok(metadata) => {
if metadata.is_file() {
self.client
.remove_file(&to_path)
.map_err(new_std_io_error)?;
} else {
return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
.with_context("input", &to_path));
}
}
}
self.client
.rename_file(&from_path, &to_path)
.map_err(new_std_io_error)?;
Ok(RpRename::new())
}
}