opendal_core/services/hdfs_native/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use hdfs_native::HdfsError;
22use hdfs_native::WriteOptions;
23
24use super::error::parse_hdfs_error;
25use crate::raw::*;
26use crate::*;
27
28/// HdfsNativeCore contains code that directly interacts with HDFS Native client.
29#[derive(Clone)]
30pub struct HdfsNativeCore {
31    pub info: Arc<AccessorInfo>,
32    pub root: String,
33    pub client: Arc<hdfs_native::Client>,
34    pub enable_append: bool,
35}
36
37impl Debug for HdfsNativeCore {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("HdfsNativeCore")
40            .field("root", &self.root)
41            .field("enable_append", &self.enable_append)
42            .finish_non_exhaustive()
43    }
44}
45
46impl HdfsNativeCore {
47    pub async fn hdfs_create_dir(&self, path: &str) -> Result<()> {
48        let p = build_rooted_abs_path(&self.root, path);
49
50        self.client
51            .mkdirs(&p, 0o777, true)
52            .await
53            .map_err(parse_hdfs_error)?;
54
55        Ok(())
56    }
57
58    pub async fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
59        let p = build_rooted_abs_path(&self.root, path);
60
61        let status: hdfs_native::client::FileStatus = self
62            .client
63            .get_file_info(&p)
64            .await
65            .map_err(parse_hdfs_error)?;
66
67        let mode = if status.isdir {
68            EntryMode::DIR
69        } else {
70            EntryMode::FILE
71        };
72
73        let mut metadata = Metadata::new(mode);
74        metadata
75            .set_last_modified(Timestamp::from_millisecond(
76                status.modification_time as i64,
77            )?)
78            .set_content_length(status.length as u64);
79
80        Ok(metadata)
81    }
82
83    pub async fn hdfs_read(
84        &self,
85        path: &str,
86        args: &OpRead,
87    ) -> Result<(hdfs_native::file::FileReader, u64, u64)> {
88        let p = build_rooted_abs_path(&self.root, path);
89
90        let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
91
92        let offset = args.range().offset();
93        let size = args.range().size().unwrap_or(u64::MAX);
94
95        Ok((f, offset, size))
96    }
97
98    pub async fn hdfs_write(
99        &self,
100        path: &str,
101        args: &OpWrite,
102    ) -> Result<(hdfs_native::file::FileWriter, u64)> {
103        let target_path = build_rooted_abs_path(&self.root, path);
104        let mut initial_size = 0;
105
106        let target_exists = match self.client.get_file_info(&target_path).await {
107            Ok(status) => {
108                initial_size = status.length as u64;
109                true
110            }
111            Err(err) => match &err {
112                HdfsError::FileNotFound(_) => false,
113                _ => return Err(parse_hdfs_error(err)),
114            },
115        };
116
117        let f = if target_exists {
118            if args.append() {
119                assert!(self.enable_append, "append is not enabled");
120                self.client
121                    .append(&target_path)
122                    .await
123                    .map_err(parse_hdfs_error)?
124            } else {
125                initial_size = 0;
126                self.client
127                    .create(&target_path, WriteOptions::default().overwrite(true))
128                    .await
129                    .map_err(parse_hdfs_error)?
130            }
131        } else {
132            initial_size = 0;
133            self.client
134                .create(&target_path, WriteOptions::default())
135                .await
136                .map_err(parse_hdfs_error)?
137        };
138
139        Ok((f, initial_size))
140    }
141
142    pub async fn hdfs_delete(&self, path: &str) -> Result<()> {
143        let p = build_rooted_abs_path(&self.root, path);
144
145        self.client
146            .delete(&p, true)
147            .await
148            .map_err(parse_hdfs_error)?;
149
150        Ok(())
151    }
152
153    pub async fn hdfs_list(&self, path: &str) -> Result<Option<(String, Option<String>)>> {
154        let p: String = build_rooted_abs_path(&self.root, path);
155
156        let isdir = match self.client.get_file_info(&p).await {
157            Ok(status) => status.isdir,
158            Err(err) => {
159                return match &err {
160                    HdfsError::FileNotFound(_) => Ok(None),
161                    _ => Err(parse_hdfs_error(err)),
162                };
163            }
164        };
165
166        let current_path = if isdir {
167            if !path.ends_with("/") {
168                Some(path.to_string() + "/")
169            } else {
170                Some(path.to_string())
171            }
172        } else {
173            None
174        };
175
176        Ok(Some((p, current_path)))
177    }
178
179    pub async fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
180        let from_path = build_rooted_abs_path(&self.root, from);
181        let to_path = build_rooted_abs_path(&self.root, to);
182
183        match self.client.get_file_info(&to_path).await {
184            Ok(status) => {
185                if status.isdir {
186                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
187                        .with_context("input", &to_path));
188                } else {
189                    self.client
190                        .delete(&to_path, true)
191                        .await
192                        .map_err(parse_hdfs_error)?;
193                }
194            }
195            Err(err) => match &err {
196                HdfsError::FileNotFound(_) => {
197                    self.client
198                        .create(&to_path, WriteOptions::default().create_parent(true))
199                        .await
200                        .map_err(parse_hdfs_error)?;
201                }
202                _ => return Err(parse_hdfs_error(err)),
203            },
204        };
205
206        self.client
207            .rename(&from_path, &to_path, true)
208            .await
209            .map_err(parse_hdfs_error)?;
210
211        Ok(())
212    }
213}