opendal/services/hdfs/
backend.rs1use std::io;
19use std::sync::Arc;
20
21use log::debug;
22
23use super::HDFS_SCHEME;
24use super::config::HdfsConfig;
25use super::core::HdfsCore;
26use super::deleter::HdfsDeleter;
27use super::lister::HdfsLister;
28use super::reader::HdfsReader;
29use super::writer::HdfsWriter;
30use crate::raw::*;
31use crate::*;
32
33#[doc = include_str!("docs.md")]
34#[derive(Debug, Default)]
35pub struct HdfsBuilder {
36 pub(super) config: HdfsConfig,
37}
38
39impl HdfsBuilder {
40 pub fn root(mut self, root: &str) -> Self {
44 self.config.root = if root.is_empty() {
45 None
46 } else {
47 Some(root.to_string())
48 };
49
50 self
51 }
52
53 pub fn name_node(mut self, name_node: &str) -> Self {
60 if !name_node.is_empty() {
61 self.config.name_node = Some(name_node.to_string())
62 }
63
64 self
65 }
66
67 pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
71 if !kerberos_ticket_cache_path.is_empty() {
72 self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
73 }
74 self
75 }
76
77 pub fn user(mut self, user: &str) -> Self {
79 if !user.is_empty() {
80 self.config.user = Some(user.to_string())
81 }
82 self
83 }
84
85 pub fn enable_append(mut self, enable_append: bool) -> Self {
89 self.config.enable_append = enable_append;
90 self
91 }
92
93 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
100 self.config.atomic_write_dir = if dir.is_empty() {
101 None
102 } else {
103 Some(String::from(dir))
104 };
105 self
106 }
107}
108
109impl Builder for HdfsBuilder {
110 type Config = HdfsConfig;
111
112 fn build(self) -> Result<impl Access> {
113 debug!("backend build started: {:?}", &self);
114
115 let name_node = match &self.config.name_node {
116 Some(v) => v,
117 None => {
118 return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
119 .with_context("service", HDFS_SCHEME));
120 }
121 };
122
123 let root = normalize_root(&self.config.root.unwrap_or_default());
124 debug!("backend use root {root}");
125
126 let mut builder = hdrs::ClientBuilder::new(name_node);
127 if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
128 builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
129 }
130 if let Some(user) = &self.config.user {
131 builder = builder.with_user(user.as_str());
132 }
133
134 let client = builder.connect().map_err(new_std_io_error)?;
135
136 if let Err(e) = client.metadata(&root) {
138 if e.kind() == io::ErrorKind::NotFound {
139 debug!("root {root} is not exist, creating now");
140
141 client.create_dir(&root).map_err(new_std_io_error)?
142 }
143 }
144
145 let atomic_write_dir = self.config.atomic_write_dir;
146
147 if let Some(d) = &atomic_write_dir {
149 if let Err(e) = client.metadata(d) {
150 if e.kind() == io::ErrorKind::NotFound {
151 client.create_dir(d).map_err(new_std_io_error)?
152 }
153 }
154 }
155
156 Ok(HdfsBackend {
157 core: Arc::new(HdfsCore {
158 info: {
159 let am = AccessorInfo::default();
160 am.set_scheme(HDFS_SCHEME)
161 .set_root(&root)
162 .set_native_capability(Capability {
163 stat: true,
164
165 read: true,
166
167 write: true,
168 write_can_append: self.config.enable_append,
169
170 create_dir: true,
171 delete: true,
172
173 list: true,
174
175 rename: true,
176
177 shared: true,
178
179 ..Default::default()
180 });
181
182 am.into()
183 },
184 root,
185 atomic_write_dir,
186 client: Arc::new(client),
187 }),
188 })
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct HdfsBackend {
195 core: Arc<HdfsCore>,
196}
197
198impl Access for HdfsBackend {
199 type Reader = HdfsReader<hdrs::AsyncFile>;
200 type Writer = HdfsWriter<hdrs::AsyncFile>;
201 type Lister = Option<HdfsLister>;
202 type Deleter = oio::OneShotDeleter<HdfsDeleter>;
203
204 fn info(&self) -> Arc<AccessorInfo> {
205 self.core.info.clone()
206 }
207
208 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
209 self.core.hdfs_create_dir(path)?;
210 Ok(RpCreateDir::default())
211 }
212
213 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
214 let m = self.core.hdfs_stat(path)?;
215 Ok(RpStat::new(m))
216 }
217
218 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
219 let f = self.core.hdfs_read(path, &args).await?;
220
221 Ok((
222 RpRead::new(),
223 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
224 ))
225 }
226
227 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
228 let (target_path, tmp_path, f, target_exists, initial_size) =
229 self.core.hdfs_write(path, &op).await?;
230
231 Ok((
232 RpWrite::new(),
233 HdfsWriter::new(
234 target_path,
235 tmp_path,
236 f,
237 Arc::clone(&self.core.client),
238 target_exists,
239 initial_size,
240 ),
241 ))
242 }
243
244 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
245 Ok((
246 RpDelete::default(),
247 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::clone(&self.core))),
248 ))
249 }
250
251 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
252 match self.core.hdfs_list(path)? {
253 Some(f) => {
254 let rd = HdfsLister::new(&self.core.root, f, path);
255 Ok((RpList::default(), Some(rd)))
256 }
257 None => Ok((RpList::default(), None)),
258 }
259 }
260
261 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
262 self.core.hdfs_rename(from, to)?;
263 Ok(RpRename::new())
264 }
265}