opendal/services/hdfs_native/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use hdfs_native::HdfsError;
23use hdfs_native::WriteOptions;
24use log::debug;
25
26use super::delete::HdfsNativeDeleter;
27use super::error::parse_hdfs_error;
28use super::lister::HdfsNativeLister;
29use super::reader::HdfsNativeReader;
30use super::writer::HdfsNativeWriter;
31use super::DEFAULT_SCHEME;
32use crate::raw::*;
33use crate::services::HdfsNativeConfig;
34use crate::*;
35impl Configurator for HdfsNativeConfig {
38 type Builder = HdfsNativeBuilder;
39 fn into_builder(self) -> Self::Builder {
40 HdfsNativeBuilder { config: self }
41 }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct HdfsNativeBuilder {
47 config: HdfsNativeConfig,
48}
49
50impl Debug for HdfsNativeBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("HdfsNativeBuilder")
53 .field("config", &self.config)
54 .finish()
55 }
56}
57
58impl HdfsNativeBuilder {
59 pub fn root(mut self, root: &str) -> Self {
63 self.config.root = if root.is_empty() {
64 None
65 } else {
66 Some(root.to_string())
67 };
68
69 self
70 }
71
72 pub fn name_node(mut self, name_node: &str) -> Self {
79 if !name_node.is_empty() {
80 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
82 }
83
84 self
85 }
86
87 pub fn enable_append(mut self, enable_append: bool) -> Self {
91 self.config.enable_append = enable_append;
92 self
93 }
94}
95
96impl Builder for HdfsNativeBuilder {
97 type Config = HdfsNativeConfig;
98
99 fn build(self) -> Result<impl Access> {
100 debug!("backend build started: {:?}", &self);
101
102 let name_node = match &self.config.name_node {
103 Some(v) => v,
104 None => {
105 return Err(Error::new(ErrorKind::ConfigInvalid, "name_node is empty")
106 .with_context("service", Scheme::HdfsNative));
107 }
108 };
109
110 let root = normalize_root(&self.config.root.unwrap_or_default());
111 debug!("backend use root {root}");
112
113 let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?;
114
115 Ok(HdfsNativeBackend {
117 root,
118 client: Arc::new(client),
119 enable_append: self.config.enable_append,
120 })
121 }
122}
123
124#[derive(Debug, Clone)]
134pub struct HdfsNativeBackend {
135 pub root: String,
136 pub client: Arc<hdfs_native::Client>,
137 enable_append: bool,
138}
139
140unsafe impl Send for HdfsNativeBackend {}
142unsafe impl Sync for HdfsNativeBackend {}
143
144impl Access for HdfsNativeBackend {
145 type Reader = HdfsNativeReader;
146 type Writer = HdfsNativeWriter;
147 type Lister = Option<HdfsNativeLister>;
148 type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
149
150 fn info(&self) -> Arc<AccessorInfo> {
151 let am = AccessorInfo::default();
152 am.set_scheme(DEFAULT_SCHEME)
153 .set_root(&self.root)
154 .set_native_capability(Capability {
155 stat: true,
156
157 read: true,
158
159 write: true,
160 write_can_append: self.enable_append,
161
162 create_dir: true,
163 delete: true,
164
165 list: true,
166
167 rename: true,
168
169 shared: true,
170
171 ..Default::default()
172 });
173
174 am.into()
175 }
176
177 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
178 let p = build_rooted_abs_path(&self.root, path);
179
180 self.client
181 .mkdirs(&p, 0o777, true)
182 .await
183 .map_err(parse_hdfs_error)?;
184
185 Ok(RpCreateDir::default())
186 }
187
188 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
189 let p = build_rooted_abs_path(&self.root, path);
190
191 let status: hdfs_native::client::FileStatus = self
192 .client
193 .get_file_info(&p)
194 .await
195 .map_err(parse_hdfs_error)?;
196
197 let mode = if status.isdir {
198 EntryMode::DIR
199 } else {
200 EntryMode::FILE
201 };
202
203 let mut metadata = Metadata::new(mode);
204 metadata
205 .set_last_modified(parse_datetime_from_from_timestamp_millis(
206 status.modification_time as i64,
207 )?)
208 .set_content_length(status.length as u64);
209
210 Ok(RpStat::new(metadata))
211 }
212
213 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
214 let p = build_rooted_abs_path(&self.root, path);
215
216 let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
217
218 let r = HdfsNativeReader::new(
219 f,
220 args.range().offset() as _,
221 args.range().size().unwrap_or(u64::MAX) as _,
222 );
223
224 Ok((RpRead::new(), r))
225 }
226
227 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
228 let target_path = build_rooted_abs_path(&self.root, path);
229 let mut initial_size = 0;
230
231 let target_exists = match self.client.get_file_info(&target_path).await {
232 Ok(status) => {
233 initial_size = status.length as u64;
234 true
235 }
236 Err(err) => match &err {
237 HdfsError::FileNotFound(_) => false,
238 _ => return Err(parse_hdfs_error(err)),
239 },
240 };
241
242 let f = if target_exists {
243 if args.append() {
244 assert!(self.enable_append, "append is not enabled");
245 self.client
246 .append(&target_path)
247 .await
248 .map_err(parse_hdfs_error)?
249 } else {
250 initial_size = 0;
251 self.client
252 .create(&target_path, WriteOptions::default().overwrite(true))
253 .await
254 .map_err(parse_hdfs_error)?
255 }
256 } else {
257 initial_size = 0;
258 self.client
259 .create(&target_path, WriteOptions::default())
260 .await
261 .map_err(parse_hdfs_error)?
262 };
263
264 Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
265 }
266
267 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
268 Ok((
269 RpDelete::default(),
270 oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
271 ))
272 }
273
274 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
275 let p: String = build_rooted_abs_path(&self.root, path);
276
277 let isdir = match self.client.get_file_info(&p).await {
278 Ok(status) => status.isdir,
279 Err(err) => {
280 return match &err {
281 HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
282 _ => Err(parse_hdfs_error(err)),
283 };
284 }
285 };
286 let current_path = if isdir {
287 if !path.ends_with("/") {
288 Some(path.to_string() + "/")
289 } else {
290 Some(path.to_string())
291 }
292 } else {
293 None
294 };
295
296 Ok((
297 RpList::default(),
298 Some(HdfsNativeLister::new(
299 &self.root,
300 &self.client,
301 &p,
302 current_path,
303 )),
304 ))
305 }
306
307 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
308 let from_path = build_rooted_abs_path(&self.root, from);
309 let to_path = build_rooted_abs_path(&self.root, to);
310 match self.client.get_file_info(&to_path).await {
311 Ok(status) => {
312 if status.isdir {
313 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
314 .with_context("input", &to_path));
315 } else {
316 self.client
317 .delete(&to_path, true)
318 .await
319 .map_err(parse_hdfs_error)?;
320 }
321 }
322 Err(err) => match &err {
323 HdfsError::FileNotFound(_) => {
324 self.client
325 .create(&to_path, WriteOptions::default().create_parent(true))
326 .await
327 .map_err(parse_hdfs_error)?;
328 }
329 _ => return Err(parse_hdfs_error(err)),
330 },
331 };
332
333 self.client
334 .rename(&from_path, &to_path, true)
335 .await
336 .map_err(parse_hdfs_error)?;
337
338 Ok(RpRename::default())
339 }
340}