opendal/services/hdfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::io::SeekFrom;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25use log::debug;
26
27use super::delete::HdfsDeleter;
28use super::lister::HdfsLister;
29use super::reader::HdfsReader;
30use super::writer::HdfsWriter;
31use crate::raw::*;
32use crate::services::HdfsConfig;
33use crate::*;
34
35impl Configurator for HdfsConfig {
36 type Builder = HdfsBuilder;
37 fn into_builder(self) -> Self::Builder {
38 HdfsBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct HdfsBuilder {
45 config: HdfsConfig,
46}
47
48impl Debug for HdfsBuilder {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("HdfsBuilder")
51 .field("config", &self.config)
52 .finish()
53 }
54}
55
56impl HdfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 pub fn name_node(mut self, name_node: &str) -> Self {
77 if !name_node.is_empty() {
78 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
80 }
81
82 self
83 }
84
85 pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
89 if !kerberos_ticket_cache_path.is_empty() {
90 self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
91 }
92 self
93 }
94
95 pub fn user(mut self, user: &str) -> Self {
97 if !user.is_empty() {
98 self.config.user = Some(user.to_string())
99 }
100 self
101 }
102
103 pub fn enable_append(mut self, enable_append: bool) -> Self {
107 self.config.enable_append = enable_append;
108 self
109 }
110
111 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
118 self.config.atomic_write_dir = if dir.is_empty() {
119 None
120 } else {
121 Some(String::from(dir))
122 };
123 self
124 }
125}
126
127impl Builder for HdfsBuilder {
128 const SCHEME: Scheme = Scheme::Hdfs;
129 type Config = HdfsConfig;
130
131 fn build(self) -> Result<impl Access> {
132 debug!("backend build started: {:?}", &self);
133
134 let name_node = match &self.config.name_node {
135 Some(v) => v,
136 None => {
137 return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
138 .with_context("service", Scheme::Hdfs))
139 }
140 };
141
142 let root = normalize_root(&self.config.root.unwrap_or_default());
143 debug!("backend use root {root}");
144
145 let mut builder = hdrs::ClientBuilder::new(name_node);
146 if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
147 builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
148 }
149 if let Some(user) = &self.config.user {
150 builder = builder.with_user(user.as_str());
151 }
152
153 let client = builder.connect().map_err(new_std_io_error)?;
154
155 if let Err(e) = client.metadata(&root) {
157 if e.kind() == io::ErrorKind::NotFound {
158 debug!("root {root} is not exist, creating now");
159
160 client.create_dir(&root).map_err(new_std_io_error)?
161 }
162 }
163
164 let atomic_write_dir = self.config.atomic_write_dir;
165
166 if let Some(d) = &atomic_write_dir {
168 if let Err(e) = client.metadata(d) {
169 if e.kind() == io::ErrorKind::NotFound {
170 client.create_dir(d).map_err(new_std_io_error)?
171 }
172 }
173 }
174
175 Ok(HdfsBackend {
176 info: {
177 let am = AccessorInfo::default();
178 am.set_scheme(Scheme::Hdfs)
179 .set_root(&root)
180 .set_native_capability(Capability {
181 stat: true,
182
183 read: true,
184
185 write: true,
186 write_can_append: self.config.enable_append,
187
188 create_dir: true,
189 delete: true,
190
191 list: true,
192
193 rename: true,
194
195 shared: true,
196
197 ..Default::default()
198 });
199
200 am.into()
201 },
202 root,
203 atomic_write_dir,
204 client: Arc::new(client),
205 })
206 }
207}
208
209#[derive(Debug, Clone)]
211pub struct HdfsBackend {
212 pub info: Arc<AccessorInfo>,
213 pub root: String,
214 atomic_write_dir: Option<String>,
215 pub client: Arc<hdrs::Client>,
216}
217
218unsafe impl Send for HdfsBackend {}
220unsafe impl Sync for HdfsBackend {}
221
222impl Access for HdfsBackend {
223 type Reader = HdfsReader<hdrs::AsyncFile>;
224 type Writer = HdfsWriter<hdrs::AsyncFile>;
225 type Lister = Option<HdfsLister>;
226 type Deleter = oio::OneShotDeleter<HdfsDeleter>;
227
228 fn info(&self) -> Arc<AccessorInfo> {
229 self.info.clone()
230 }
231
232 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
233 let p = build_rooted_abs_path(&self.root, path);
234
235 self.client.create_dir(&p).map_err(new_std_io_error)?;
236
237 Ok(RpCreateDir::default())
238 }
239
240 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
241 let p = build_rooted_abs_path(&self.root, path);
242
243 let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
244
245 let mode = if meta.is_dir() {
246 EntryMode::DIR
247 } else if meta.is_file() {
248 EntryMode::FILE
249 } else {
250 EntryMode::Unknown
251 };
252 let mut m = Metadata::new(mode);
253 m.set_content_length(meta.len());
254 m.set_last_modified(meta.modified().into());
255
256 Ok(RpStat::new(m))
257 }
258
259 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
260 let p = build_rooted_abs_path(&self.root, path);
261
262 let client = self.client.clone();
263 let mut f = client
264 .open_file()
265 .read(true)
266 .async_open(&p)
267 .await
268 .map_err(new_std_io_error)?;
269
270 if args.range().offset() != 0 {
271 use futures::AsyncSeekExt;
272
273 f.seek(SeekFrom::Start(args.range().offset()))
274 .await
275 .map_err(new_std_io_error)?;
276 }
277
278 Ok((
279 RpRead::new(),
280 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
281 ))
282 }
283
284 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
285 let target_path = build_rooted_abs_path(&self.root, path);
286 let mut initial_size = 0;
287 let target_exists = match self.client.metadata(&target_path) {
288 Ok(meta) => {
289 initial_size = meta.len();
290 true
291 }
292 Err(err) => {
293 if err.kind() != io::ErrorKind::NotFound {
294 return Err(new_std_io_error(err));
295 }
296 false
297 }
298 };
299
300 let should_append = op.append() && target_exists;
301 let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
302 (!should_append).then_some(build_rooted_abs_path(
304 atomic_write_dir,
305 &build_tmp_path_of(path),
306 ))
307 });
308
309 if !target_exists {
310 let parent = get_parent(&target_path);
311 self.client.create_dir(parent).map_err(new_std_io_error)?;
312 }
313 if !should_append {
314 initial_size = 0;
315 }
316
317 let mut open_options = self.client.open_file();
318 open_options.create(true);
319 if should_append {
320 open_options.append(true);
321 } else {
322 open_options.write(true);
323 }
324
325 let f = open_options
326 .async_open(tmp_path.as_ref().unwrap_or(&target_path))
327 .await
328 .map_err(new_std_io_error)?;
329
330 Ok((
331 RpWrite::new(),
332 HdfsWriter::new(
333 target_path,
334 tmp_path,
335 f,
336 Arc::clone(&self.client),
337 target_exists,
338 initial_size,
339 ),
340 ))
341 }
342
343 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
344 Ok((
345 RpDelete::default(),
346 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
347 ))
348 }
349
350 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
351 let p = build_rooted_abs_path(&self.root, path);
352
353 let f = match self.client.read_dir(&p) {
354 Ok(f) => f,
355 Err(e) => {
356 return if e.kind() == io::ErrorKind::NotFound {
357 Ok((RpList::default(), None))
358 } else {
359 Err(new_std_io_error(e))
360 }
361 }
362 };
363
364 let rd = HdfsLister::new(&self.root, f, path);
365
366 Ok((RpList::default(), Some(rd)))
367 }
368
369 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
370 let from_path = build_rooted_abs_path(&self.root, from);
371 self.client.metadata(&from_path).map_err(new_std_io_error)?;
372
373 let to_path = build_rooted_abs_path(&self.root, to);
374 let result = self.client.metadata(&to_path);
375 match result {
376 Err(err) => {
377 if err.kind() != io::ErrorKind::NotFound {
379 return Err(new_std_io_error(err));
380 }
381
382 let parent = PathBuf::from(&to_path)
383 .parent()
384 .ok_or_else(|| {
385 Error::new(
386 ErrorKind::Unexpected,
387 "path should have parent but not, it must be malformed",
388 )
389 .with_context("input", &to_path)
390 })?
391 .to_path_buf();
392
393 self.client
394 .create_dir(&parent.to_string_lossy())
395 .map_err(new_std_io_error)?;
396 }
397 Ok(metadata) => {
398 if metadata.is_file() {
399 self.client
400 .remove_file(&to_path)
401 .map_err(new_std_io_error)?;
402 } else {
403 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
404 .with_context("input", &to_path));
405 }
406 }
407 }
408
409 self.client
410 .rename_file(&from_path, &to_path)
411 .map_err(new_std_io_error)?;
412
413 Ok(RpRename::new())
414 }
415}