opendal/services/hdfs_native/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use hdfs_native::HdfsError;
23use hdfs_native::WriteOptions;
24use log::debug;
25
26use super::delete::HdfsNativeDeleter;
27use super::error::parse_hdfs_error;
28use super::lister::HdfsNativeLister;
29use super::reader::HdfsNativeReader;
30use super::writer::HdfsNativeWriter;
31use crate::raw::*;
32use crate::services::HdfsNativeConfig;
33use crate::*;
34
35/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) support.
36/// Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).
37impl Configurator for HdfsNativeConfig {
38    type Builder = HdfsNativeBuilder;
39    fn into_builder(self) -> Self::Builder {
40        HdfsNativeBuilder { config: self }
41    }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct HdfsNativeBuilder {
47    config: HdfsNativeConfig,
48}
49
50impl Debug for HdfsNativeBuilder {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("HdfsNativeBuilder")
53            .field("config", &self.config)
54            .finish()
55    }
56}
57
58impl HdfsNativeBuilder {
59    /// Set root of this backend.
60    ///
61    /// All operations will happen under this root.
62    pub fn root(mut self, root: &str) -> Self {
63        self.config.root = if root.is_empty() {
64            None
65        } else {
66            Some(root.to_string())
67        };
68
69        self
70    }
71
72    /// Set name_node of this backend.
73    ///
74    /// Valid format including:
75    ///
76    /// - `default`: using the default setting based on hadoop config.
77    /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
78    pub fn name_node(mut self, name_node: &str) -> Self {
79        if !name_node.is_empty() {
80            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
81            self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
82        }
83
84        self
85    }
86
87    /// Enable append capacity of this backend.
88    ///
89    /// This should be disabled when HDFS runs in non-distributed mode.
90    pub fn enable_append(mut self, enable_append: bool) -> Self {
91        self.config.enable_append = enable_append;
92        self
93    }
94}
95
96impl Builder for HdfsNativeBuilder {
97    const SCHEME: Scheme = Scheme::HdfsNative;
98    type Config = HdfsNativeConfig;
99
100    fn build(self) -> Result<impl Access> {
101        debug!("backend build started: {:?}", &self);
102
103        let name_node = match &self.config.name_node {
104            Some(v) => v,
105            None => {
106                return Err(Error::new(ErrorKind::ConfigInvalid, "name_node is empty")
107                    .with_context("service", Scheme::HdfsNative));
108            }
109        };
110
111        let root = normalize_root(&self.config.root.unwrap_or_default());
112        debug!("backend use root {}", root);
113
114        let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?;
115
116        // need to check if root dir exists, create if not
117        Ok(HdfsNativeBackend {
118            root,
119            client: Arc::new(client),
120            enable_append: self.config.enable_append,
121        })
122    }
123}
124
125// #[inline]
126// fn tmp_file_of(path: &str) -> String {
127//     let name = get_basename(path);
128//     let uuid = Uuid::new_v4().to_string();
129
130//     format!("{name}.{uuid}")
131// }
132
133/// Backend for hdfs-native services.
134#[derive(Debug, Clone)]
135pub struct HdfsNativeBackend {
136    pub root: String,
137    pub client: Arc<hdfs_native::Client>,
138    enable_append: bool,
139}
140
141/// hdfs_native::Client is thread-safe.
142unsafe impl Send for HdfsNativeBackend {}
143unsafe impl Sync for HdfsNativeBackend {}
144
145impl Access for HdfsNativeBackend {
146    type Reader = HdfsNativeReader;
147    type Writer = HdfsNativeWriter;
148    type Lister = Option<HdfsNativeLister>;
149    type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
150
151    fn info(&self) -> Arc<AccessorInfo> {
152        let am = AccessorInfo::default();
153        am.set_scheme(Scheme::HdfsNative)
154            .set_root(&self.root)
155            .set_native_capability(Capability {
156                stat: true,
157                stat_has_last_modified: true,
158                stat_has_content_length: true,
159
160                read: true,
161
162                write: true,
163                write_can_append: self.enable_append,
164
165                create_dir: true,
166                delete: true,
167
168                list: true,
169                list_has_content_length: true,
170                list_has_last_modified: true,
171
172                rename: true,
173
174                shared: true,
175
176                ..Default::default()
177            });
178
179        am.into()
180    }
181
182    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
183        let p = build_rooted_abs_path(&self.root, path);
184
185        self.client
186            .mkdirs(&p, 0o777, true)
187            .await
188            .map_err(parse_hdfs_error)?;
189
190        Ok(RpCreateDir::default())
191    }
192
193    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
194        let p = build_rooted_abs_path(&self.root, path);
195
196        let status: hdfs_native::client::FileStatus = self
197            .client
198            .get_file_info(&p)
199            .await
200            .map_err(parse_hdfs_error)?;
201
202        let mode = if status.isdir {
203            EntryMode::DIR
204        } else {
205            EntryMode::FILE
206        };
207
208        let mut metadata = Metadata::new(mode);
209        metadata
210            .set_last_modified(parse_datetime_from_from_timestamp_millis(
211                status.modification_time as i64,
212            )?)
213            .set_content_length(status.length as u64);
214
215        Ok(RpStat::new(metadata))
216    }
217
218    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
219        let p = build_rooted_abs_path(&self.root, path);
220
221        let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
222
223        let r = HdfsNativeReader::new(
224            f,
225            args.range().offset() as _,
226            args.range().size().unwrap_or(u64::MAX) as _,
227        );
228
229        Ok((RpRead::new(), r))
230    }
231
232    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
233        let target_path = build_rooted_abs_path(&self.root, path);
234        let mut initial_size = 0;
235
236        let target_exists = match self.client.get_file_info(&target_path).await {
237            Ok(status) => {
238                initial_size = status.length as u64;
239                true
240            }
241            Err(err) => match &err {
242                HdfsError::FileNotFound(_) => false,
243                _ => return Err(parse_hdfs_error(err)),
244            },
245        };
246
247        let f = if target_exists {
248            if args.append() {
249                assert!(self.enable_append, "append is not enabled");
250                self.client
251                    .append(&target_path)
252                    .await
253                    .map_err(parse_hdfs_error)?
254            } else {
255                initial_size = 0;
256                self.client
257                    .create(&target_path, WriteOptions::default().overwrite(true))
258                    .await
259                    .map_err(parse_hdfs_error)?
260            }
261        } else {
262            initial_size = 0;
263            self.client
264                .create(&target_path, WriteOptions::default())
265                .await
266                .map_err(parse_hdfs_error)?
267        };
268
269        Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
270    }
271
272    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
273        Ok((
274            RpDelete::default(),
275            oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
276        ))
277    }
278
279    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
280        let p: String = build_rooted_abs_path(&self.root, path);
281
282        let isdir = match self.client.get_file_info(&p).await {
283            Ok(status) => status.isdir,
284            Err(err) => {
285                return match &err {
286                    HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
287                    _ => Err(parse_hdfs_error(err)),
288                };
289            }
290        };
291        let current_path = if isdir {
292            if !path.ends_with("/") {
293                Some(path.to_string() + "/")
294            } else {
295                Some(path.to_string())
296            }
297        } else {
298            None
299        };
300
301        Ok((
302            RpList::default(),
303            Some(HdfsNativeLister::new(
304                &self.root,
305                &self.client,
306                &p,
307                current_path,
308            )),
309        ))
310    }
311
312    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
313        let from_path = build_rooted_abs_path(&self.root, from);
314        let to_path = build_rooted_abs_path(&self.root, to);
315        match self.client.get_file_info(&to_path).await {
316            Ok(status) => {
317                if status.isdir {
318                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
319                        .with_context("input", &to_path));
320                } else {
321                    self.client
322                        .delete(&to_path, true)
323                        .await
324                        .map_err(parse_hdfs_error)?;
325                }
326            }
327            Err(err) => match &err {
328                HdfsError::FileNotFound(_) => {
329                    self.client
330                        .create(&to_path, WriteOptions::default().create_parent(true))
331                        .await
332                        .map_err(parse_hdfs_error)?;
333                }
334                _ => return Err(parse_hdfs_error(err)),
335            },
336        };
337
338        self.client
339            .rename(&from_path, &to_path, true)
340            .await
341            .map_err(parse_hdfs_error)?;
342
343        Ok(RpRename::default())
344    }
345}