opendal/services/hdfs_native/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use hdfs_native::HdfsError;
23use hdfs_native::WriteOptions;
24use log::debug;
25
26use super::delete::HdfsNativeDeleter;
27use super::error::parse_hdfs_error;
28use super::lister::HdfsNativeLister;
29use super::reader::HdfsNativeReader;
30use super::writer::HdfsNativeWriter;
31use crate::raw::*;
32use crate::services::HdfsNativeConfig;
33use crate::*;
34
35impl Configurator for HdfsNativeConfig {
38 type Builder = HdfsNativeBuilder;
39 fn into_builder(self) -> Self::Builder {
40 HdfsNativeBuilder { config: self }
41 }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct HdfsNativeBuilder {
47 config: HdfsNativeConfig,
48}
49
50impl Debug for HdfsNativeBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("HdfsNativeBuilder")
53 .field("config", &self.config)
54 .finish()
55 }
56}
57
58impl HdfsNativeBuilder {
59 pub fn root(mut self, root: &str) -> Self {
63 self.config.root = if root.is_empty() {
64 None
65 } else {
66 Some(root.to_string())
67 };
68
69 self
70 }
71
72 pub fn name_node(mut self, name_node: &str) -> Self {
79 if !name_node.is_empty() {
80 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
82 }
83
84 self
85 }
86
87 pub fn enable_append(mut self, enable_append: bool) -> Self {
91 self.config.enable_append = enable_append;
92 self
93 }
94}
95
96impl Builder for HdfsNativeBuilder {
97 const SCHEME: Scheme = Scheme::HdfsNative;
98 type Config = HdfsNativeConfig;
99
100 fn build(self) -> Result<impl Access> {
101 debug!("backend build started: {:?}", &self);
102
103 let name_node = match &self.config.name_node {
104 Some(v) => v,
105 None => {
106 return Err(Error::new(ErrorKind::ConfigInvalid, "name_node is empty")
107 .with_context("service", Scheme::HdfsNative));
108 }
109 };
110
111 let root = normalize_root(&self.config.root.unwrap_or_default());
112 debug!("backend use root {root}");
113
114 let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?;
115
116 Ok(HdfsNativeBackend {
118 root,
119 client: Arc::new(client),
120 enable_append: self.config.enable_append,
121 })
122 }
123}
124
125#[derive(Debug, Clone)]
135pub struct HdfsNativeBackend {
136 pub root: String,
137 pub client: Arc<hdfs_native::Client>,
138 enable_append: bool,
139}
140
141unsafe impl Send for HdfsNativeBackend {}
143unsafe impl Sync for HdfsNativeBackend {}
144
145impl Access for HdfsNativeBackend {
146 type Reader = HdfsNativeReader;
147 type Writer = HdfsNativeWriter;
148 type Lister = Option<HdfsNativeLister>;
149 type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
150
151 fn info(&self) -> Arc<AccessorInfo> {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::HdfsNative)
154 .set_root(&self.root)
155 .set_native_capability(Capability {
156 stat: true,
157
158 read: true,
159
160 write: true,
161 write_can_append: self.enable_append,
162
163 create_dir: true,
164 delete: true,
165
166 list: true,
167
168 rename: true,
169
170 shared: true,
171
172 ..Default::default()
173 });
174
175 am.into()
176 }
177
178 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
179 let p = build_rooted_abs_path(&self.root, path);
180
181 self.client
182 .mkdirs(&p, 0o777, true)
183 .await
184 .map_err(parse_hdfs_error)?;
185
186 Ok(RpCreateDir::default())
187 }
188
189 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
190 let p = build_rooted_abs_path(&self.root, path);
191
192 let status: hdfs_native::client::FileStatus = self
193 .client
194 .get_file_info(&p)
195 .await
196 .map_err(parse_hdfs_error)?;
197
198 let mode = if status.isdir {
199 EntryMode::DIR
200 } else {
201 EntryMode::FILE
202 };
203
204 let mut metadata = Metadata::new(mode);
205 metadata
206 .set_last_modified(parse_datetime_from_from_timestamp_millis(
207 status.modification_time as i64,
208 )?)
209 .set_content_length(status.length as u64);
210
211 Ok(RpStat::new(metadata))
212 }
213
214 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
215 let p = build_rooted_abs_path(&self.root, path);
216
217 let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
218
219 let r = HdfsNativeReader::new(
220 f,
221 args.range().offset() as _,
222 args.range().size().unwrap_or(u64::MAX) as _,
223 );
224
225 Ok((RpRead::new(), r))
226 }
227
228 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
229 let target_path = build_rooted_abs_path(&self.root, path);
230 let mut initial_size = 0;
231
232 let target_exists = match self.client.get_file_info(&target_path).await {
233 Ok(status) => {
234 initial_size = status.length as u64;
235 true
236 }
237 Err(err) => match &err {
238 HdfsError::FileNotFound(_) => false,
239 _ => return Err(parse_hdfs_error(err)),
240 },
241 };
242
243 let f = if target_exists {
244 if args.append() {
245 assert!(self.enable_append, "append is not enabled");
246 self.client
247 .append(&target_path)
248 .await
249 .map_err(parse_hdfs_error)?
250 } else {
251 initial_size = 0;
252 self.client
253 .create(&target_path, WriteOptions::default().overwrite(true))
254 .await
255 .map_err(parse_hdfs_error)?
256 }
257 } else {
258 initial_size = 0;
259 self.client
260 .create(&target_path, WriteOptions::default())
261 .await
262 .map_err(parse_hdfs_error)?
263 };
264
265 Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
266 }
267
268 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
269 Ok((
270 RpDelete::default(),
271 oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
272 ))
273 }
274
275 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
276 let p: String = build_rooted_abs_path(&self.root, path);
277
278 let isdir = match self.client.get_file_info(&p).await {
279 Ok(status) => status.isdir,
280 Err(err) => {
281 return match &err {
282 HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
283 _ => Err(parse_hdfs_error(err)),
284 };
285 }
286 };
287 let current_path = if isdir {
288 if !path.ends_with("/") {
289 Some(path.to_string() + "/")
290 } else {
291 Some(path.to_string())
292 }
293 } else {
294 None
295 };
296
297 Ok((
298 RpList::default(),
299 Some(HdfsNativeLister::new(
300 &self.root,
301 &self.client,
302 &p,
303 current_path,
304 )),
305 ))
306 }
307
308 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
309 let from_path = build_rooted_abs_path(&self.root, from);
310 let to_path = build_rooted_abs_path(&self.root, to);
311 match self.client.get_file_info(&to_path).await {
312 Ok(status) => {
313 if status.isdir {
314 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
315 .with_context("input", &to_path));
316 } else {
317 self.client
318 .delete(&to_path, true)
319 .await
320 .map_err(parse_hdfs_error)?;
321 }
322 }
323 Err(err) => match &err {
324 HdfsError::FileNotFound(_) => {
325 self.client
326 .create(&to_path, WriteOptions::default().create_parent(true))
327 .await
328 .map_err(parse_hdfs_error)?;
329 }
330 _ => return Err(parse_hdfs_error(err)),
331 },
332 };
333
334 self.client
335 .rename(&from_path, &to_path, true)
336 .await
337 .map_err(parse_hdfs_error)?;
338
339 Ok(RpRename::default())
340 }
341}