opendal/services/hdfs_native/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use hdfs_native::HdfsError;
23use hdfs_native::WriteOptions;
24use log::debug;
25
26use super::delete::HdfsNativeDeleter;
27use super::error::parse_hdfs_error;
28use super::lister::HdfsNativeLister;
29use super::reader::HdfsNativeReader;
30use super::writer::HdfsNativeWriter;
31use crate::raw::*;
32use crate::services::HdfsNativeConfig;
33use crate::*;
34
35impl Configurator for HdfsNativeConfig {
38 type Builder = HdfsNativeBuilder;
39 fn into_builder(self) -> Self::Builder {
40 HdfsNativeBuilder { config: self }
41 }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct HdfsNativeBuilder {
47 config: HdfsNativeConfig,
48}
49
50impl Debug for HdfsNativeBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("HdfsNativeBuilder")
53 .field("config", &self.config)
54 .finish()
55 }
56}
57
58impl HdfsNativeBuilder {
59 pub fn root(mut self, root: &str) -> Self {
63 self.config.root = if root.is_empty() {
64 None
65 } else {
66 Some(root.to_string())
67 };
68
69 self
70 }
71
72 pub fn name_node(mut self, name_node: &str) -> Self {
79 if !name_node.is_empty() {
80 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
82 }
83
84 self
85 }
86
87 pub fn enable_append(mut self, enable_append: bool) -> Self {
91 self.config.enable_append = enable_append;
92 self
93 }
94}
95
96impl Builder for HdfsNativeBuilder {
97 const SCHEME: Scheme = Scheme::HdfsNative;
98 type Config = HdfsNativeConfig;
99
100 fn build(self) -> Result<impl Access> {
101 debug!("backend build started: {:?}", &self);
102
103 let name_node = match &self.config.name_node {
104 Some(v) => v,
105 None => {
106 return Err(Error::new(ErrorKind::ConfigInvalid, "name_node is empty")
107 .with_context("service", Scheme::HdfsNative));
108 }
109 };
110
111 let root = normalize_root(&self.config.root.unwrap_or_default());
112 debug!("backend use root {}", root);
113
114 let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?;
115
116 Ok(HdfsNativeBackend {
118 root,
119 client: Arc::new(client),
120 enable_append: self.config.enable_append,
121 })
122 }
123}
124
125#[derive(Debug, Clone)]
135pub struct HdfsNativeBackend {
136 pub root: String,
137 pub client: Arc<hdfs_native::Client>,
138 enable_append: bool,
139}
140
141unsafe impl Send for HdfsNativeBackend {}
143unsafe impl Sync for HdfsNativeBackend {}
144
145impl Access for HdfsNativeBackend {
146 type Reader = HdfsNativeReader;
147 type Writer = HdfsNativeWriter;
148 type Lister = Option<HdfsNativeLister>;
149 type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
150
151 fn info(&self) -> Arc<AccessorInfo> {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::HdfsNative)
154 .set_root(&self.root)
155 .set_native_capability(Capability {
156 stat: true,
157 stat_has_last_modified: true,
158 stat_has_content_length: true,
159
160 read: true,
161
162 write: true,
163 write_can_append: self.enable_append,
164
165 create_dir: true,
166 delete: true,
167
168 list: true,
169 list_has_content_length: true,
170 list_has_last_modified: true,
171
172 rename: true,
173
174 shared: true,
175
176 ..Default::default()
177 });
178
179 am.into()
180 }
181
182 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
183 let p = build_rooted_abs_path(&self.root, path);
184
185 self.client
186 .mkdirs(&p, 0o777, true)
187 .await
188 .map_err(parse_hdfs_error)?;
189
190 Ok(RpCreateDir::default())
191 }
192
193 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
194 let p = build_rooted_abs_path(&self.root, path);
195
196 let status: hdfs_native::client::FileStatus = self
197 .client
198 .get_file_info(&p)
199 .await
200 .map_err(parse_hdfs_error)?;
201
202 let mode = if status.isdir {
203 EntryMode::DIR
204 } else {
205 EntryMode::FILE
206 };
207
208 let mut metadata = Metadata::new(mode);
209 metadata
210 .set_last_modified(parse_datetime_from_from_timestamp_millis(
211 status.modification_time as i64,
212 )?)
213 .set_content_length(status.length as u64);
214
215 Ok(RpStat::new(metadata))
216 }
217
218 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
219 let p = build_rooted_abs_path(&self.root, path);
220
221 let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
222
223 let r = HdfsNativeReader::new(
224 f,
225 args.range().offset() as _,
226 args.range().size().unwrap_or(u64::MAX) as _,
227 );
228
229 Ok((RpRead::new(), r))
230 }
231
232 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
233 let target_path = build_rooted_abs_path(&self.root, path);
234 let mut initial_size = 0;
235
236 let target_exists = match self.client.get_file_info(&target_path).await {
237 Ok(status) => {
238 initial_size = status.length as u64;
239 true
240 }
241 Err(err) => match &err {
242 HdfsError::FileNotFound(_) => false,
243 _ => return Err(parse_hdfs_error(err)),
244 },
245 };
246
247 let f = if target_exists {
248 if args.append() {
249 assert!(self.enable_append, "append is not enabled");
250 self.client
251 .append(&target_path)
252 .await
253 .map_err(parse_hdfs_error)?
254 } else {
255 initial_size = 0;
256 self.client
257 .create(&target_path, WriteOptions::default().overwrite(true))
258 .await
259 .map_err(parse_hdfs_error)?
260 }
261 } else {
262 initial_size = 0;
263 self.client
264 .create(&target_path, WriteOptions::default())
265 .await
266 .map_err(parse_hdfs_error)?
267 };
268
269 Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
270 }
271
272 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
273 Ok((
274 RpDelete::default(),
275 oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
276 ))
277 }
278
279 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
280 let p: String = build_rooted_abs_path(&self.root, path);
281
282 let isdir = match self.client.get_file_info(&p).await {
283 Ok(status) => status.isdir,
284 Err(err) => {
285 return match &err {
286 HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
287 _ => Err(parse_hdfs_error(err)),
288 };
289 }
290 };
291 let current_path = if isdir {
292 if !path.ends_with("/") {
293 Some(path.to_string() + "/")
294 } else {
295 Some(path.to_string())
296 }
297 } else {
298 None
299 };
300
301 Ok((
302 RpList::default(),
303 Some(HdfsNativeLister::new(
304 &self.root,
305 &self.client,
306 &p,
307 current_path,
308 )),
309 ))
310 }
311
312 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
313 let from_path = build_rooted_abs_path(&self.root, from);
314 let to_path = build_rooted_abs_path(&self.root, to);
315 match self.client.get_file_info(&to_path).await {
316 Ok(status) => {
317 if status.isdir {
318 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
319 .with_context("input", &to_path));
320 } else {
321 self.client
322 .delete(&to_path, true)
323 .await
324 .map_err(parse_hdfs_error)?;
325 }
326 }
327 Err(err) => match &err {
328 HdfsError::FileNotFound(_) => {
329 self.client
330 .create(&to_path, WriteOptions::default().create_parent(true))
331 .await
332 .map_err(parse_hdfs_error)?;
333 }
334 _ => return Err(parse_hdfs_error(err)),
335 },
336 };
337
338 self.client
339 .rename(&from_path, &to_path, true)
340 .await
341 .map_err(parse_hdfs_error)?;
342
343 Ok(RpRename::default())
344 }
345}