1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::io::SeekFrom;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25use log::debug;
26use uuid::Uuid;
27
28use super::delete::HdfsDeleter;
29use super::lister::HdfsLister;
30use super::reader::HdfsReader;
31use super::writer::HdfsWriter;
32use crate::raw::*;
33use crate::services::HdfsConfig;
34use crate::*;
35
36impl Configurator for HdfsConfig {
37 type Builder = HdfsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 HdfsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct HdfsBuilder {
46 config: HdfsConfig,
47}
48
49impl Debug for HdfsBuilder {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("HdfsBuilder")
52 .field("config", &self.config)
53 .finish()
54 }
55}
56
57impl HdfsBuilder {
58 pub fn root(mut self, root: &str) -> Self {
62 self.config.root = if root.is_empty() {
63 None
64 } else {
65 Some(root.to_string())
66 };
67
68 self
69 }
70
71 pub fn name_node(mut self, name_node: &str) -> Self {
78 if !name_node.is_empty() {
79 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
81 }
82
83 self
84 }
85
86 pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
90 if !kerberos_ticket_cache_path.is_empty() {
91 self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
92 }
93 self
94 }
95
96 pub fn user(mut self, user: &str) -> Self {
98 if !user.is_empty() {
99 self.config.user = Some(user.to_string())
100 }
101 self
102 }
103
104 pub fn enable_append(mut self, enable_append: bool) -> Self {
108 self.config.enable_append = enable_append;
109 self
110 }
111
112 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
119 self.config.atomic_write_dir = if dir.is_empty() {
120 None
121 } else {
122 Some(String::from(dir))
123 };
124 self
125 }
126}
127
128impl Builder for HdfsBuilder {
129 const SCHEME: Scheme = Scheme::Hdfs;
130 type Config = HdfsConfig;
131
132 fn build(self) -> Result<impl Access> {
133 debug!("backend build started: {:?}", &self);
134
135 let name_node = match &self.config.name_node {
136 Some(v) => v,
137 None => {
138 return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
139 .with_context("service", Scheme::Hdfs))
140 }
141 };
142
143 let root = normalize_root(&self.config.root.unwrap_or_default());
144 debug!("backend use root {}", root);
145
146 let mut builder = hdrs::ClientBuilder::new(name_node);
147 if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
148 builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
149 }
150 if let Some(user) = &self.config.user {
151 builder = builder.with_user(user.as_str());
152 }
153
154 let client = builder.connect().map_err(new_std_io_error)?;
155
156 if let Err(e) = client.metadata(&root) {
158 if e.kind() == io::ErrorKind::NotFound {
159 debug!("root {} is not exist, creating now", root);
160
161 client.create_dir(&root).map_err(new_std_io_error)?
162 }
163 }
164
165 let atomic_write_dir = self.config.atomic_write_dir;
166
167 if let Some(d) = &atomic_write_dir {
169 if let Err(e) = client.metadata(d) {
170 if e.kind() == io::ErrorKind::NotFound {
171 client.create_dir(d).map_err(new_std_io_error)?
172 }
173 }
174 }
175
176 Ok(HdfsBackend {
177 info: {
178 let am = AccessorInfo::default();
179 am.set_scheme(Scheme::Hdfs)
180 .set_root(&root)
181 .set_native_capability(Capability {
182 stat: true,
183 stat_has_content_length: true,
184 stat_has_last_modified: true,
185
186 read: true,
187
188 write: true,
189 write_can_append: self.config.enable_append,
190
191 create_dir: true,
192 delete: true,
193
194 list: true,
195 list_has_content_length: true,
196 list_has_last_modified: true,
197
198 rename: true,
199 blocking: true,
200
201 shared: true,
202
203 ..Default::default()
204 });
205
206 am.into()
207 },
208 root,
209 atomic_write_dir,
210 client: Arc::new(client),
211 })
212 }
213}
214
215#[inline]
216fn tmp_file_of(path: &str) -> String {
217 let name = get_basename(path);
218 let uuid = Uuid::new_v4().to_string();
219
220 format!("{name}.{uuid}")
221}
222
223#[derive(Debug, Clone)]
225pub struct HdfsBackend {
226 pub info: Arc<AccessorInfo>,
227 pub root: String,
228 atomic_write_dir: Option<String>,
229 pub client: Arc<hdrs::Client>,
230}
231
232unsafe impl Send for HdfsBackend {}
234unsafe impl Sync for HdfsBackend {}
235
236impl Access for HdfsBackend {
237 type Reader = HdfsReader<hdrs::AsyncFile>;
238 type Writer = HdfsWriter<hdrs::AsyncFile>;
239 type Lister = Option<HdfsLister>;
240 type Deleter = oio::OneShotDeleter<HdfsDeleter>;
241 type BlockingReader = HdfsReader<hdrs::File>;
242 type BlockingWriter = HdfsWriter<hdrs::File>;
243 type BlockingLister = Option<HdfsLister>;
244 type BlockingDeleter = oio::OneShotDeleter<HdfsDeleter>;
245
246 fn info(&self) -> Arc<AccessorInfo> {
247 self.info.clone()
248 }
249
250 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
251 let p = build_rooted_abs_path(&self.root, path);
252
253 self.client.create_dir(&p).map_err(new_std_io_error)?;
254
255 Ok(RpCreateDir::default())
256 }
257
258 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
259 let p = build_rooted_abs_path(&self.root, path);
260
261 let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
262
263 let mode = if meta.is_dir() {
264 EntryMode::DIR
265 } else if meta.is_file() {
266 EntryMode::FILE
267 } else {
268 EntryMode::Unknown
269 };
270 let mut m = Metadata::new(mode);
271 m.set_content_length(meta.len());
272 m.set_last_modified(meta.modified().into());
273
274 Ok(RpStat::new(m))
275 }
276
277 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
278 let p = build_rooted_abs_path(&self.root, path);
279
280 let client = self.client.clone();
281 let mut f = client
282 .open_file()
283 .read(true)
284 .async_open(&p)
285 .await
286 .map_err(new_std_io_error)?;
287
288 if args.range().offset() != 0 {
289 use futures::AsyncSeekExt;
290
291 f.seek(SeekFrom::Start(args.range().offset()))
292 .await
293 .map_err(new_std_io_error)?;
294 }
295
296 Ok((
297 RpRead::new(),
298 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
299 ))
300 }
301
302 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
303 let target_path = build_rooted_abs_path(&self.root, path);
304 let mut initial_size = 0;
305 let target_exists = match self.client.metadata(&target_path) {
306 Ok(meta) => {
307 initial_size = meta.len();
308 true
309 }
310 Err(err) => {
311 if err.kind() != io::ErrorKind::NotFound {
312 return Err(new_std_io_error(err));
313 }
314 false
315 }
316 };
317
318 let should_append = op.append() && target_exists;
319 let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
320 if should_append {
322 None
323 } else {
324 Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)))
325 }
326 });
327
328 if !target_exists {
329 let parent = get_parent(&target_path);
330 self.client.create_dir(parent).map_err(new_std_io_error)?;
331 }
332 if !should_append {
333 initial_size = 0;
334 }
335
336 let mut open_options = self.client.open_file();
337 open_options.create(true);
338 if should_append {
339 open_options.append(true);
340 } else {
341 open_options.write(true);
342 }
343
344 let f = open_options
345 .async_open(tmp_path.as_ref().unwrap_or(&target_path))
346 .await
347 .map_err(new_std_io_error)?;
348
349 Ok((
350 RpWrite::new(),
351 HdfsWriter::new(
352 target_path,
353 tmp_path,
354 f,
355 Arc::clone(&self.client),
356 target_exists,
357 initial_size,
358 ),
359 ))
360 }
361
362 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
363 Ok((
364 RpDelete::default(),
365 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
366 ))
367 }
368
369 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
370 let p = build_rooted_abs_path(&self.root, path);
371
372 let f = match self.client.read_dir(&p) {
373 Ok(f) => f,
374 Err(e) => {
375 return if e.kind() == io::ErrorKind::NotFound {
376 Ok((RpList::default(), None))
377 } else {
378 Err(new_std_io_error(e))
379 }
380 }
381 };
382
383 let rd = HdfsLister::new(&self.root, f, path);
384
385 Ok((RpList::default(), Some(rd)))
386 }
387
388 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
389 let from_path = build_rooted_abs_path(&self.root, from);
390 self.client.metadata(&from_path).map_err(new_std_io_error)?;
391
392 let to_path = build_rooted_abs_path(&self.root, to);
393 let result = self.client.metadata(&to_path);
394 match result {
395 Err(err) => {
396 if err.kind() != io::ErrorKind::NotFound {
398 return Err(new_std_io_error(err));
399 }
400
401 let parent = PathBuf::from(&to_path)
402 .parent()
403 .ok_or_else(|| {
404 Error::new(
405 ErrorKind::Unexpected,
406 "path should have parent but not, it must be malformed",
407 )
408 .with_context("input", &to_path)
409 })?
410 .to_path_buf();
411
412 self.client
413 .create_dir(&parent.to_string_lossy())
414 .map_err(new_std_io_error)?;
415 }
416 Ok(metadata) => {
417 if metadata.is_file() {
418 self.client
419 .remove_file(&to_path)
420 .map_err(new_std_io_error)?;
421 } else {
422 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
423 .with_context("input", &to_path));
424 }
425 }
426 }
427
428 self.client
429 .rename_file(&from_path, &to_path)
430 .map_err(new_std_io_error)?;
431
432 Ok(RpRename::new())
433 }
434
435 fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
436 let p = build_rooted_abs_path(&self.root, path);
437
438 self.client.create_dir(&p).map_err(new_std_io_error)?;
439
440 Ok(RpCreateDir::default())
441 }
442
443 fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
444 let p = build_rooted_abs_path(&self.root, path);
445
446 let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
447
448 let mode = if meta.is_dir() {
449 EntryMode::DIR
450 } else if meta.is_file() {
451 EntryMode::FILE
452 } else {
453 EntryMode::Unknown
454 };
455 let mut m = Metadata::new(mode);
456 m.set_content_length(meta.len());
457 m.set_last_modified(meta.modified().into());
458
459 Ok(RpStat::new(m))
460 }
461
462 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
463 let p = build_rooted_abs_path(&self.root, path);
464
465 let mut f = self
466 .client
467 .open_file()
468 .read(true)
469 .open(&p)
470 .map_err(new_std_io_error)?;
471
472 if args.range().offset() != 0 {
473 use std::io::Seek;
474
475 f.seek(SeekFrom::Start(args.range().offset()))
476 .map_err(new_std_io_error)?;
477 }
478
479 Ok((
480 RpRead::new(),
481 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
482 ))
483 }
484
485 fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
486 let target_path = build_rooted_abs_path(&self.root, path);
487 let mut initial_size = 0;
488 let target_exists = match self.client.metadata(&target_path) {
489 Ok(meta) => {
490 initial_size = meta.len();
491 true
492 }
493 Err(err) => {
494 if err.kind() != io::ErrorKind::NotFound {
495 return Err(new_std_io_error(err));
496 }
497 false
498 }
499 };
500
501 let should_append = op.append() && target_exists;
502 let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
503 if should_append {
505 None
506 } else {
507 Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)))
508 }
509 });
510
511 if !target_exists {
512 let parent = get_parent(&target_path);
513 self.client.create_dir(parent).map_err(new_std_io_error)?;
514 }
515 if !should_append {
516 initial_size = 0;
517 }
518
519 let mut open_options = self.client.open_file();
520 open_options.create(true);
521 if should_append {
522 open_options.append(true);
523 } else {
524 open_options.write(true);
525 }
526
527 let f = open_options
528 .open(tmp_path.as_ref().unwrap_or(&target_path))
529 .map_err(new_std_io_error)?;
530
531 Ok((
532 RpWrite::new(),
533 HdfsWriter::new(
534 target_path,
535 tmp_path,
536 f,
537 Arc::clone(&self.client),
538 target_exists,
539 initial_size,
540 ),
541 ))
542 }
543
544 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
545 Ok((
546 RpDelete::default(),
547 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
548 ))
549 }
550
551 fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
552 let p = build_rooted_abs_path(&self.root, path);
553
554 let f = match self.client.read_dir(&p) {
555 Ok(f) => f,
556 Err(e) => {
557 return if e.kind() == io::ErrorKind::NotFound {
558 Ok((RpList::default(), None))
559 } else {
560 Err(new_std_io_error(e))
561 }
562 }
563 };
564
565 let rd = HdfsLister::new(&self.root, f, path);
566
567 Ok((RpList::default(), Some(rd)))
568 }
569
570 fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
571 let from_path = build_rooted_abs_path(&self.root, from);
572 self.client.metadata(&from_path).map_err(new_std_io_error)?;
573
574 let to_path = build_rooted_abs_path(&self.root, to);
575 let result = self.client.metadata(&to_path);
576 match result {
577 Err(err) => {
578 if err.kind() != io::ErrorKind::NotFound {
580 return Err(new_std_io_error(err));
581 }
582
583 let parent = PathBuf::from(&to_path)
584 .parent()
585 .ok_or_else(|| {
586 Error::new(
587 ErrorKind::Unexpected,
588 "path should have parent but not, it must be malformed",
589 )
590 .with_context("input", &to_path)
591 })?
592 .to_path_buf();
593
594 self.client
595 .create_dir(&parent.to_string_lossy())
596 .map_err(new_std_io_error)?;
597 }
598 Ok(metadata) => {
599 if metadata.is_file() {
600 self.client
601 .remove_file(&to_path)
602 .map_err(new_std_io_error)?;
603 } else {
604 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
605 .with_context("input", &to_path));
606 }
607 }
608 }
609
610 self.client
611 .rename_file(&from_path, &to_path)
612 .map_err(new_std_io_error)?;
613
614 Ok(RpRename::new())
615 }
616}