opendal/services/hdfs_native/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::delete::HdfsNativeDeleter;
19use super::error::parse_hdfs_error;
20use super::lister::HdfsNativeLister;
21use super::reader::HdfsNativeReader;
22use super::writer::HdfsNativeWriter;
23use crate::raw::*;
24use crate::services::HdfsNativeConfig;
25use crate::*;
26use hdfs_native::HdfsError;
27use hdfs_native::WriteOptions;
28use log::debug;
29use std::fmt::Debug;
30use std::fmt::Formatter;
31use std::sync::Arc;
32
33/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) support.
34/// Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).
35impl Configurator for HdfsNativeConfig {
36    type Builder = HdfsNativeBuilder;
37    fn into_builder(self) -> Self::Builder {
38        HdfsNativeBuilder { config: self }
39    }
40}
41
42#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct HdfsNativeBuilder {
45    config: HdfsNativeConfig,
46}
47
48impl Debug for HdfsNativeBuilder {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("HdfsNativeBuilder")
51            .field("config", &self.config)
52            .finish()
53    }
54}
55
56impl HdfsNativeBuilder {
57    /// Set root of this backend.
58    ///
59    /// All operations will happen under this root.
60    pub fn root(mut self, root: &str) -> Self {
61        self.config.root = if root.is_empty() {
62            None
63        } else {
64            Some(root.to_string())
65        };
66
67        self
68    }
69
70    /// Set name_node of this backend.
71    ///
72    /// Valid format including:
73    ///
74    /// - `default`: using the default setting based on hadoop config.
75    /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
76    pub fn name_node(mut self, name_node: &str) -> Self {
77        if !name_node.is_empty() {
78            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
79            self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
80        }
81
82        self
83    }
84
85    /// Enable append capacity of this backend.
86    ///
87    /// This should be disabled when HDFS runs in non-distributed mode.
88    pub fn enable_append(mut self, enable_append: bool) -> Self {
89        self.config.enable_append = enable_append;
90        self
91    }
92}
93
94impl Builder for HdfsNativeBuilder {
95    const SCHEME: Scheme = Scheme::HdfsNative;
96    type Config = HdfsNativeConfig;
97
98    fn build(self) -> Result<impl Access> {
99        debug!("backend build started: {:?}", &self);
100
101        let name_node = match &self.config.name_node {
102            Some(v) => v,
103            None => {
104                return Err(Error::new(ErrorKind::ConfigInvalid, "name_node is empty")
105                    .with_context("service", Scheme::HdfsNative));
106            }
107        };
108
109        let root = normalize_root(&self.config.root.unwrap_or_default());
110        debug!("backend use root {}", root);
111
112        let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?;
113
114        // need to check if root dir exists, create if not
115        Ok(HdfsNativeBackend {
116            root,
117            client: Arc::new(client),
118            enable_append: self.config.enable_append,
119        })
120    }
121}
122
123// #[inline]
124// fn tmp_file_of(path: &str) -> String {
125//     let name = get_basename(path);
126//     let uuid = Uuid::new_v4().to_string();
127
128//     format!("{name}.{uuid}")
129// }
130
131/// Backend for hdfs-native services.
132#[derive(Debug, Clone)]
133pub struct HdfsNativeBackend {
134    pub root: String,
135    pub client: Arc<hdfs_native::Client>,
136    enable_append: bool,
137}
138
139/// hdfs_native::Client is thread-safe.
140unsafe impl Send for HdfsNativeBackend {}
141unsafe impl Sync for HdfsNativeBackend {}
142
143impl Access for HdfsNativeBackend {
144    type Reader = HdfsNativeReader;
145    type Writer = HdfsNativeWriter;
146    type Lister = Option<HdfsNativeLister>;
147    type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
148    type BlockingReader = ();
149    type BlockingWriter = ();
150    type BlockingLister = ();
151    type BlockingDeleter = ();
152
153    fn info(&self) -> Arc<AccessorInfo> {
154        let am = AccessorInfo::default();
155        am.set_scheme(Scheme::HdfsNative)
156            .set_root(&self.root)
157            .set_native_capability(Capability {
158                stat: true,
159                stat_has_last_modified: true,
160                stat_has_content_length: true,
161
162                read: true,
163
164                write: true,
165                write_can_append: self.enable_append,
166
167                create_dir: true,
168                delete: true,
169
170                list: true,
171                list_has_content_length: true,
172                list_has_last_modified: true,
173
174                rename: true,
175
176                shared: true,
177
178                ..Default::default()
179            });
180
181        am.into()
182    }
183
184    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
185        let p = build_rooted_abs_path(&self.root, path);
186
187        self.client
188            .mkdirs(&p, 0o777, true)
189            .await
190            .map_err(parse_hdfs_error)?;
191
192        Ok(RpCreateDir::default())
193    }
194
195    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
196        let p = build_rooted_abs_path(&self.root, path);
197
198        let status: hdfs_native::client::FileStatus = self
199            .client
200            .get_file_info(&p)
201            .await
202            .map_err(parse_hdfs_error)?;
203
204        let mode = if status.isdir {
205            EntryMode::DIR
206        } else {
207            EntryMode::FILE
208        };
209
210        let mut metadata = Metadata::new(mode);
211        metadata
212            .set_last_modified(parse_datetime_from_from_timestamp_millis(
213                status.modification_time as i64,
214            )?)
215            .set_content_length(status.length as u64);
216
217        Ok(RpStat::new(metadata))
218    }
219
220    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
221        let p = build_rooted_abs_path(&self.root, path);
222
223        let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
224
225        let r = HdfsNativeReader::new(
226            f,
227            args.range().offset() as _,
228            args.range().size().unwrap_or(u64::MAX) as _,
229        );
230
231        Ok((RpRead::new(), r))
232    }
233
234    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
235        let target_path = build_rooted_abs_path(&self.root, path);
236        let mut initial_size = 0;
237
238        let target_exists = match self.client.get_file_info(&target_path).await {
239            Ok(status) => {
240                initial_size = status.length as u64;
241                true
242            }
243            Err(err) => match &err {
244                HdfsError::FileNotFound(_) => false,
245                _ => return Err(parse_hdfs_error(err)),
246            },
247        };
248
249        let f = if target_exists {
250            if args.append() {
251                assert!(self.enable_append, "append is not enabled");
252                self.client
253                    .append(&target_path)
254                    .await
255                    .map_err(parse_hdfs_error)?
256            } else {
257                initial_size = 0;
258                self.client
259                    .create(&target_path, WriteOptions::default().overwrite(true))
260                    .await
261                    .map_err(parse_hdfs_error)?
262            }
263        } else {
264            initial_size = 0;
265            self.client
266                .create(&target_path, WriteOptions::default())
267                .await
268                .map_err(parse_hdfs_error)?
269        };
270
271        Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
272    }
273
274    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
275        Ok((
276            RpDelete::default(),
277            oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
278        ))
279    }
280
281    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
282        let p: String = build_rooted_abs_path(&self.root, path);
283
284        let isdir = match self.client.get_file_info(&p).await {
285            Ok(status) => status.isdir,
286            Err(err) => {
287                return match &err {
288                    HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
289                    _ => Err(parse_hdfs_error(err)),
290                };
291            }
292        };
293        let current_path = if isdir {
294            if !path.ends_with("/") {
295                Some(path.to_string() + "/")
296            } else {
297                Some(path.to_string())
298            }
299        } else {
300            None
301        };
302
303        Ok((
304            RpList::default(),
305            Some(HdfsNativeLister::new(
306                &self.root,
307                &self.client,
308                &p,
309                current_path,
310            )),
311        ))
312    }
313
314    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
315        let from_path = build_rooted_abs_path(&self.root, from);
316        let to_path = build_rooted_abs_path(&self.root, to);
317        match self.client.get_file_info(&to_path).await {
318            Ok(status) => {
319                if status.isdir {
320                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
321                        .with_context("input", &to_path));
322                } else {
323                    self.client
324                        .delete(&to_path, true)
325                        .await
326                        .map_err(parse_hdfs_error)?;
327                }
328            }
329            Err(err) => match &err {
330                HdfsError::FileNotFound(_) => {
331                    self.client
332                        .create(&to_path, WriteOptions::default().create_parent(true))
333                        .await
334                        .map_err(parse_hdfs_error)?;
335                }
336                _ => return Err(parse_hdfs_error(err)),
337            },
338        };
339
340        self.client
341            .rename(&from_path, &to_path, true)
342            .await
343            .map_err(parse_hdfs_error)?;
344
345        Ok(RpRename::default())
346    }
347}