opendal/services/hdfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::io::SeekFrom;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25use log::debug;
26use uuid::Uuid;
27
28use super::delete::HdfsDeleter;
29use super::lister::HdfsLister;
30use super::reader::HdfsReader;
31use super::writer::HdfsWriter;
32use crate::raw::*;
33use crate::services::HdfsConfig;
34use crate::*;
35
36impl Configurator for HdfsConfig {
37    type Builder = HdfsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        HdfsBuilder { config: self }
40    }
41}
42
43#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct HdfsBuilder {
46    config: HdfsConfig,
47}
48
49impl Debug for HdfsBuilder {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("HdfsBuilder")
52            .field("config", &self.config)
53            .finish()
54    }
55}
56
57impl HdfsBuilder {
58    /// Set root of this backend.
59    ///
60    /// All operations will happen under this root.
61    pub fn root(mut self, root: &str) -> Self {
62        self.config.root = if root.is_empty() {
63            None
64        } else {
65            Some(root.to_string())
66        };
67
68        self
69    }
70
71    /// Set name_node of this backend.
72    ///
73    /// Valid format including:
74    ///
75    /// - `default`: using the default setting based on hadoop config.
76    /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
77    pub fn name_node(mut self, name_node: &str) -> Self {
78        if !name_node.is_empty() {
79            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
80            self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
81        }
82
83        self
84    }
85
86    /// Set kerberos_ticket_cache_path of this backend
87    ///
88    /// This should be configured when kerberos is enabled.
89    pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
90        if !kerberos_ticket_cache_path.is_empty() {
91            self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
92        }
93        self
94    }
95
96    /// Set user of this backend
97    pub fn user(mut self, user: &str) -> Self {
98        if !user.is_empty() {
99            self.config.user = Some(user.to_string())
100        }
101        self
102    }
103
104    /// Enable append capacity of this backend.
105    ///
106    /// This should be disabled when HDFS runs in non-distributed mode.
107    pub fn enable_append(mut self, enable_append: bool) -> Self {
108        self.config.enable_append = enable_append;
109        self
110    }
111
112    /// Set temp dir for atomic write.
113    ///
114    /// # Notes
115    ///
116    /// - When append is enabled, we will not use atomic write
117    ///   to avoid data loss and performance issue.
118    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
119        self.config.atomic_write_dir = if dir.is_empty() {
120            None
121        } else {
122            Some(String::from(dir))
123        };
124        self
125    }
126}
127
128impl Builder for HdfsBuilder {
129    const SCHEME: Scheme = Scheme::Hdfs;
130    type Config = HdfsConfig;
131
132    fn build(self) -> Result<impl Access> {
133        debug!("backend build started: {:?}", &self);
134
135        let name_node = match &self.config.name_node {
136            Some(v) => v,
137            None => {
138                return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
139                    .with_context("service", Scheme::Hdfs))
140            }
141        };
142
143        let root = normalize_root(&self.config.root.unwrap_or_default());
144        debug!("backend use root {}", root);
145
146        let mut builder = hdrs::ClientBuilder::new(name_node);
147        if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
148            builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
149        }
150        if let Some(user) = &self.config.user {
151            builder = builder.with_user(user.as_str());
152        }
153
154        let client = builder.connect().map_err(new_std_io_error)?;
155
156        // Create root dir if not exist.
157        if let Err(e) = client.metadata(&root) {
158            if e.kind() == io::ErrorKind::NotFound {
159                debug!("root {} is not exist, creating now", root);
160
161                client.create_dir(&root).map_err(new_std_io_error)?
162            }
163        }
164
165        let atomic_write_dir = self.config.atomic_write_dir;
166
167        // If atomic write dir is not exist, we must create it.
168        if let Some(d) = &atomic_write_dir {
169            if let Err(e) = client.metadata(d) {
170                if e.kind() == io::ErrorKind::NotFound {
171                    client.create_dir(d).map_err(new_std_io_error)?
172                }
173            }
174        }
175
176        Ok(HdfsBackend {
177            info: {
178                let am = AccessorInfo::default();
179                am.set_scheme(Scheme::Hdfs)
180                    .set_root(&root)
181                    .set_native_capability(Capability {
182                        stat: true,
183                        stat_has_content_length: true,
184                        stat_has_last_modified: true,
185
186                        read: true,
187
188                        write: true,
189                        write_can_append: self.config.enable_append,
190
191                        create_dir: true,
192                        delete: true,
193
194                        list: true,
195                        list_has_content_length: true,
196                        list_has_last_modified: true,
197
198                        rename: true,
199                        blocking: true,
200
201                        shared: true,
202
203                        ..Default::default()
204                    });
205
206                am.into()
207            },
208            root,
209            atomic_write_dir,
210            client: Arc::new(client),
211        })
212    }
213}
214
215#[inline]
216fn tmp_file_of(path: &str) -> String {
217    let name = get_basename(path);
218    let uuid = Uuid::new_v4().to_string();
219
220    format!("{name}.{uuid}")
221}
222
223/// Backend for hdfs services.
224#[derive(Debug, Clone)]
225pub struct HdfsBackend {
226    pub info: Arc<AccessorInfo>,
227    pub root: String,
228    atomic_write_dir: Option<String>,
229    pub client: Arc<hdrs::Client>,
230}
231
232/// hdrs::Client is thread-safe.
233unsafe impl Send for HdfsBackend {}
234unsafe impl Sync for HdfsBackend {}
235
236impl Access for HdfsBackend {
237    type Reader = HdfsReader<hdrs::AsyncFile>;
238    type Writer = HdfsWriter<hdrs::AsyncFile>;
239    type Lister = Option<HdfsLister>;
240    type Deleter = oio::OneShotDeleter<HdfsDeleter>;
241    type BlockingReader = HdfsReader<hdrs::File>;
242    type BlockingWriter = HdfsWriter<hdrs::File>;
243    type BlockingLister = Option<HdfsLister>;
244    type BlockingDeleter = oio::OneShotDeleter<HdfsDeleter>;
245
246    fn info(&self) -> Arc<AccessorInfo> {
247        self.info.clone()
248    }
249
250    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
251        let p = build_rooted_abs_path(&self.root, path);
252
253        self.client.create_dir(&p).map_err(new_std_io_error)?;
254
255        Ok(RpCreateDir::default())
256    }
257
258    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
259        let p = build_rooted_abs_path(&self.root, path);
260
261        let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
262
263        let mode = if meta.is_dir() {
264            EntryMode::DIR
265        } else if meta.is_file() {
266            EntryMode::FILE
267        } else {
268            EntryMode::Unknown
269        };
270        let mut m = Metadata::new(mode);
271        m.set_content_length(meta.len());
272        m.set_last_modified(meta.modified().into());
273
274        Ok(RpStat::new(m))
275    }
276
277    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
278        let p = build_rooted_abs_path(&self.root, path);
279
280        let client = self.client.clone();
281        let mut f = client
282            .open_file()
283            .read(true)
284            .async_open(&p)
285            .await
286            .map_err(new_std_io_error)?;
287
288        if args.range().offset() != 0 {
289            use futures::AsyncSeekExt;
290
291            f.seek(SeekFrom::Start(args.range().offset()))
292                .await
293                .map_err(new_std_io_error)?;
294        }
295
296        Ok((
297            RpRead::new(),
298            HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
299        ))
300    }
301
302    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
303        let target_path = build_rooted_abs_path(&self.root, path);
304        let mut initial_size = 0;
305        let target_exists = match self.client.metadata(&target_path) {
306            Ok(meta) => {
307                initial_size = meta.len();
308                true
309            }
310            Err(err) => {
311                if err.kind() != io::ErrorKind::NotFound {
312                    return Err(new_std_io_error(err));
313                }
314                false
315            }
316        };
317
318        let should_append = op.append() && target_exists;
319        let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
320            // If the target file exists, we should append to the end of it directly.
321            if should_append {
322                None
323            } else {
324                Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)))
325            }
326        });
327
328        if !target_exists {
329            let parent = get_parent(&target_path);
330            self.client.create_dir(parent).map_err(new_std_io_error)?;
331        }
332        if !should_append {
333            initial_size = 0;
334        }
335
336        let mut open_options = self.client.open_file();
337        open_options.create(true);
338        if should_append {
339            open_options.append(true);
340        } else {
341            open_options.write(true);
342        }
343
344        let f = open_options
345            .async_open(tmp_path.as_ref().unwrap_or(&target_path))
346            .await
347            .map_err(new_std_io_error)?;
348
349        Ok((
350            RpWrite::new(),
351            HdfsWriter::new(
352                target_path,
353                tmp_path,
354                f,
355                Arc::clone(&self.client),
356                target_exists,
357                initial_size,
358            ),
359        ))
360    }
361
362    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
363        Ok((
364            RpDelete::default(),
365            oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
366        ))
367    }
368
369    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
370        let p = build_rooted_abs_path(&self.root, path);
371
372        let f = match self.client.read_dir(&p) {
373            Ok(f) => f,
374            Err(e) => {
375                return if e.kind() == io::ErrorKind::NotFound {
376                    Ok((RpList::default(), None))
377                } else {
378                    Err(new_std_io_error(e))
379                }
380            }
381        };
382
383        let rd = HdfsLister::new(&self.root, f, path);
384
385        Ok((RpList::default(), Some(rd)))
386    }
387
388    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
389        let from_path = build_rooted_abs_path(&self.root, from);
390        self.client.metadata(&from_path).map_err(new_std_io_error)?;
391
392        let to_path = build_rooted_abs_path(&self.root, to);
393        let result = self.client.metadata(&to_path);
394        match result {
395            Err(err) => {
396                // Early return if other error happened.
397                if err.kind() != io::ErrorKind::NotFound {
398                    return Err(new_std_io_error(err));
399                }
400
401                let parent = PathBuf::from(&to_path)
402                    .parent()
403                    .ok_or_else(|| {
404                        Error::new(
405                            ErrorKind::Unexpected,
406                            "path should have parent but not, it must be malformed",
407                        )
408                        .with_context("input", &to_path)
409                    })?
410                    .to_path_buf();
411
412                self.client
413                    .create_dir(&parent.to_string_lossy())
414                    .map_err(new_std_io_error)?;
415            }
416            Ok(metadata) => {
417                if metadata.is_file() {
418                    self.client
419                        .remove_file(&to_path)
420                        .map_err(new_std_io_error)?;
421                } else {
422                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
423                        .with_context("input", &to_path));
424                }
425            }
426        }
427
428        self.client
429            .rename_file(&from_path, &to_path)
430            .map_err(new_std_io_error)?;
431
432        Ok(RpRename::new())
433    }
434
435    fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
436        let p = build_rooted_abs_path(&self.root, path);
437
438        self.client.create_dir(&p).map_err(new_std_io_error)?;
439
440        Ok(RpCreateDir::default())
441    }
442
443    fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
444        let p = build_rooted_abs_path(&self.root, path);
445
446        let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
447
448        let mode = if meta.is_dir() {
449            EntryMode::DIR
450        } else if meta.is_file() {
451            EntryMode::FILE
452        } else {
453            EntryMode::Unknown
454        };
455        let mut m = Metadata::new(mode);
456        m.set_content_length(meta.len());
457        m.set_last_modified(meta.modified().into());
458
459        Ok(RpStat::new(m))
460    }
461
462    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
463        let p = build_rooted_abs_path(&self.root, path);
464
465        let mut f = self
466            .client
467            .open_file()
468            .read(true)
469            .open(&p)
470            .map_err(new_std_io_error)?;
471
472        if args.range().offset() != 0 {
473            use std::io::Seek;
474
475            f.seek(SeekFrom::Start(args.range().offset()))
476                .map_err(new_std_io_error)?;
477        }
478
479        Ok((
480            RpRead::new(),
481            HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
482        ))
483    }
484
485    fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
486        let target_path = build_rooted_abs_path(&self.root, path);
487        let mut initial_size = 0;
488        let target_exists = match self.client.metadata(&target_path) {
489            Ok(meta) => {
490                initial_size = meta.len();
491                true
492            }
493            Err(err) => {
494                if err.kind() != io::ErrorKind::NotFound {
495                    return Err(new_std_io_error(err));
496                }
497                false
498            }
499        };
500
501        let should_append = op.append() && target_exists;
502        let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
503            // If the target file exists, we should append to the end of it directly.
504            if should_append {
505                None
506            } else {
507                Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)))
508            }
509        });
510
511        if !target_exists {
512            let parent = get_parent(&target_path);
513            self.client.create_dir(parent).map_err(new_std_io_error)?;
514        }
515        if !should_append {
516            initial_size = 0;
517        }
518
519        let mut open_options = self.client.open_file();
520        open_options.create(true);
521        if should_append {
522            open_options.append(true);
523        } else {
524            open_options.write(true);
525        }
526
527        let f = open_options
528            .open(tmp_path.as_ref().unwrap_or(&target_path))
529            .map_err(new_std_io_error)?;
530
531        Ok((
532            RpWrite::new(),
533            HdfsWriter::new(
534                target_path,
535                tmp_path,
536                f,
537                Arc::clone(&self.client),
538                target_exists,
539                initial_size,
540            ),
541        ))
542    }
543
544    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
545        Ok((
546            RpDelete::default(),
547            oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
548        ))
549    }
550
551    fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
552        let p = build_rooted_abs_path(&self.root, path);
553
554        let f = match self.client.read_dir(&p) {
555            Ok(f) => f,
556            Err(e) => {
557                return if e.kind() == io::ErrorKind::NotFound {
558                    Ok((RpList::default(), None))
559                } else {
560                    Err(new_std_io_error(e))
561                }
562            }
563        };
564
565        let rd = HdfsLister::new(&self.root, f, path);
566
567        Ok((RpList::default(), Some(rd)))
568    }
569
570    fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
571        let from_path = build_rooted_abs_path(&self.root, from);
572        self.client.metadata(&from_path).map_err(new_std_io_error)?;
573
574        let to_path = build_rooted_abs_path(&self.root, to);
575        let result = self.client.metadata(&to_path);
576        match result {
577            Err(err) => {
578                // Early return if other error happened.
579                if err.kind() != io::ErrorKind::NotFound {
580                    return Err(new_std_io_error(err));
581                }
582
583                let parent = PathBuf::from(&to_path)
584                    .parent()
585                    .ok_or_else(|| {
586                        Error::new(
587                            ErrorKind::Unexpected,
588                            "path should have parent but not, it must be malformed",
589                        )
590                        .with_context("input", &to_path)
591                    })?
592                    .to_path_buf();
593
594                self.client
595                    .create_dir(&parent.to_string_lossy())
596                    .map_err(new_std_io_error)?;
597            }
598            Ok(metadata) => {
599                if metadata.is_file() {
600                    self.client
601                        .remove_file(&to_path)
602                        .map_err(new_std_io_error)?;
603                } else {
604                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
605                        .with_context("input", &to_path));
606                }
607            }
608        }
609
610        self.client
611            .rename_file(&from_path, &to_path)
612            .map_err(new_std_io_error)?;
613
614        Ok(RpRename::new())
615    }
616}