opendal/services/hdfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::io::SeekFrom;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25use log::debug;
26
27use super::delete::HdfsDeleter;
28use super::lister::HdfsLister;
29use super::reader::HdfsReader;
30use super::writer::HdfsWriter;
31use super::DEFAULT_SCHEME;
32use crate::raw::*;
33use crate::services::HdfsConfig;
34use crate::*;
35impl Configurator for HdfsConfig {
36    type Builder = HdfsBuilder;
37    fn into_builder(self) -> Self::Builder {
38        HdfsBuilder { config: self }
39    }
40}
41
42#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct HdfsBuilder {
45    config: HdfsConfig,
46}
47
48impl Debug for HdfsBuilder {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("HdfsBuilder")
51            .field("config", &self.config)
52            .finish()
53    }
54}
55
56impl HdfsBuilder {
57    /// Set root of this backend.
58    ///
59    /// All operations will happen under this root.
60    pub fn root(mut self, root: &str) -> Self {
61        self.config.root = if root.is_empty() {
62            None
63        } else {
64            Some(root.to_string())
65        };
66
67        self
68    }
69
70    /// Set name_node of this backend.
71    ///
72    /// Valid format including:
73    ///
74    /// - `default`: using the default setting based on hadoop config.
75    /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
76    pub fn name_node(mut self, name_node: &str) -> Self {
77        if !name_node.is_empty() {
78            self.config.name_node = Some(name_node.to_string())
79        }
80
81        self
82    }
83
84    /// Set kerberos_ticket_cache_path of this backend
85    ///
86    /// This should be configured when kerberos is enabled.
87    pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
88        if !kerberos_ticket_cache_path.is_empty() {
89            self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
90        }
91        self
92    }
93
94    /// Set user of this backend
95    pub fn user(mut self, user: &str) -> Self {
96        if !user.is_empty() {
97            self.config.user = Some(user.to_string())
98        }
99        self
100    }
101
102    /// Enable append capacity of this backend.
103    ///
104    /// This should be disabled when HDFS runs in non-distributed mode.
105    pub fn enable_append(mut self, enable_append: bool) -> Self {
106        self.config.enable_append = enable_append;
107        self
108    }
109
110    /// Set temp dir for atomic write.
111    ///
112    /// # Notes
113    ///
114    /// - When append is enabled, we will not use atomic write
115    ///   to avoid data loss and performance issue.
116    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
117        self.config.atomic_write_dir = if dir.is_empty() {
118            None
119        } else {
120            Some(String::from(dir))
121        };
122        self
123    }
124}
125
126impl Builder for HdfsBuilder {
127    type Config = HdfsConfig;
128
129    fn build(self) -> Result<impl Access> {
130        debug!("backend build started: {:?}", &self);
131
132        let name_node = match &self.config.name_node {
133            Some(v) => v,
134            None => {
135                return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
136                    .with_context("service", Scheme::Hdfs))
137            }
138        };
139
140        let root = normalize_root(&self.config.root.unwrap_or_default());
141        debug!("backend use root {root}");
142
143        let mut builder = hdrs::ClientBuilder::new(name_node);
144        if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
145            builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
146        }
147        if let Some(user) = &self.config.user {
148            builder = builder.with_user(user.as_str());
149        }
150
151        let client = builder.connect().map_err(new_std_io_error)?;
152
153        // Create root dir if not exist.
154        if let Err(e) = client.metadata(&root) {
155            if e.kind() == io::ErrorKind::NotFound {
156                debug!("root {root} is not exist, creating now");
157
158                client.create_dir(&root).map_err(new_std_io_error)?
159            }
160        }
161
162        let atomic_write_dir = self.config.atomic_write_dir;
163
164        // If atomic write dir is not exist, we must create it.
165        if let Some(d) = &atomic_write_dir {
166            if let Err(e) = client.metadata(d) {
167                if e.kind() == io::ErrorKind::NotFound {
168                    client.create_dir(d).map_err(new_std_io_error)?
169                }
170            }
171        }
172
173        Ok(HdfsBackend {
174            info: {
175                let am = AccessorInfo::default();
176                am.set_scheme(DEFAULT_SCHEME)
177                    .set_root(&root)
178                    .set_native_capability(Capability {
179                        stat: true,
180
181                        read: true,
182
183                        write: true,
184                        write_can_append: self.config.enable_append,
185
186                        create_dir: true,
187                        delete: true,
188
189                        list: true,
190
191                        rename: true,
192
193                        shared: true,
194
195                        ..Default::default()
196                    });
197
198                am.into()
199            },
200            root,
201            atomic_write_dir,
202            client: Arc::new(client),
203        })
204    }
205}
206
207/// Backend for hdfs services.
208#[derive(Debug, Clone)]
209pub struct HdfsBackend {
210    pub info: Arc<AccessorInfo>,
211    pub root: String,
212    atomic_write_dir: Option<String>,
213    pub client: Arc<hdrs::Client>,
214}
215
216/// hdrs::Client is thread-safe.
217unsafe impl Send for HdfsBackend {}
218unsafe impl Sync for HdfsBackend {}
219
220impl Access for HdfsBackend {
221    type Reader = HdfsReader<hdrs::AsyncFile>;
222    type Writer = HdfsWriter<hdrs::AsyncFile>;
223    type Lister = Option<HdfsLister>;
224    type Deleter = oio::OneShotDeleter<HdfsDeleter>;
225
226    fn info(&self) -> Arc<AccessorInfo> {
227        self.info.clone()
228    }
229
230    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
231        let p = build_rooted_abs_path(&self.root, path);
232
233        self.client.create_dir(&p).map_err(new_std_io_error)?;
234
235        Ok(RpCreateDir::default())
236    }
237
238    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
239        let p = build_rooted_abs_path(&self.root, path);
240
241        let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
242
243        let mode = if meta.is_dir() {
244            EntryMode::DIR
245        } else if meta.is_file() {
246            EntryMode::FILE
247        } else {
248            EntryMode::Unknown
249        };
250        let mut m = Metadata::new(mode);
251        m.set_content_length(meta.len());
252        m.set_last_modified(meta.modified().into());
253
254        Ok(RpStat::new(m))
255    }
256
257    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
258        let p = build_rooted_abs_path(&self.root, path);
259
260        let client = self.client.clone();
261        let mut f = client
262            .open_file()
263            .read(true)
264            .async_open(&p)
265            .await
266            .map_err(new_std_io_error)?;
267
268        if args.range().offset() != 0 {
269            use futures::AsyncSeekExt;
270
271            f.seek(SeekFrom::Start(args.range().offset()))
272                .await
273                .map_err(new_std_io_error)?;
274        }
275
276        Ok((
277            RpRead::new(),
278            HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
279        ))
280    }
281
282    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
283        let target_path = build_rooted_abs_path(&self.root, path);
284        let mut initial_size = 0;
285        let target_exists = match self.client.metadata(&target_path) {
286            Ok(meta) => {
287                initial_size = meta.len();
288                true
289            }
290            Err(err) => {
291                if err.kind() != io::ErrorKind::NotFound {
292                    return Err(new_std_io_error(err));
293                }
294                false
295            }
296        };
297
298        let should_append = op.append() && target_exists;
299        let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
300            // If the target file exists, we should append to the end of it directly.
301            (!should_append).then_some(build_rooted_abs_path(
302                atomic_write_dir,
303                &build_tmp_path_of(path),
304            ))
305        });
306
307        if !target_exists {
308            let parent = get_parent(&target_path);
309            self.client.create_dir(parent).map_err(new_std_io_error)?;
310        }
311        if !should_append {
312            initial_size = 0;
313        }
314
315        let mut open_options = self.client.open_file();
316        open_options.create(true);
317        if should_append {
318            open_options.append(true);
319        } else {
320            open_options.write(true);
321        }
322
323        let f = open_options
324            .async_open(tmp_path.as_ref().unwrap_or(&target_path))
325            .await
326            .map_err(new_std_io_error)?;
327
328        Ok((
329            RpWrite::new(),
330            HdfsWriter::new(
331                target_path,
332                tmp_path,
333                f,
334                Arc::clone(&self.client),
335                target_exists,
336                initial_size,
337            ),
338        ))
339    }
340
341    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
342        Ok((
343            RpDelete::default(),
344            oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
345        ))
346    }
347
348    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
349        let p = build_rooted_abs_path(&self.root, path);
350
351        let f = match self.client.read_dir(&p) {
352            Ok(f) => f,
353            Err(e) => {
354                return if e.kind() == io::ErrorKind::NotFound {
355                    Ok((RpList::default(), None))
356                } else {
357                    Err(new_std_io_error(e))
358                }
359            }
360        };
361
362        let rd = HdfsLister::new(&self.root, f, path);
363
364        Ok((RpList::default(), Some(rd)))
365    }
366
367    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
368        let from_path = build_rooted_abs_path(&self.root, from);
369        self.client.metadata(&from_path).map_err(new_std_io_error)?;
370
371        let to_path = build_rooted_abs_path(&self.root, to);
372        let result = self.client.metadata(&to_path);
373        match result {
374            Err(err) => {
375                // Early return if other error happened.
376                if err.kind() != io::ErrorKind::NotFound {
377                    return Err(new_std_io_error(err));
378                }
379
380                let parent = PathBuf::from(&to_path)
381                    .parent()
382                    .ok_or_else(|| {
383                        Error::new(
384                            ErrorKind::Unexpected,
385                            "path should have parent but not, it must be malformed",
386                        )
387                        .with_context("input", &to_path)
388                    })?
389                    .to_path_buf();
390
391                self.client
392                    .create_dir(&parent.to_string_lossy())
393                    .map_err(new_std_io_error)?;
394            }
395            Ok(metadata) => {
396                if metadata.is_file() {
397                    self.client
398                        .remove_file(&to_path)
399                        .map_err(new_std_io_error)?;
400                } else {
401                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
402                        .with_context("input", &to_path));
403                }
404            }
405        }
406
407        self.client
408            .rename_file(&from_path, &to_path)
409            .map_err(new_std_io_error)?;
410
411        Ok(RpRename::new())
412    }
413}