opendal/services/hdfs_native/
backend.rs1use super::delete::HdfsNativeDeleter;
19use super::error::parse_hdfs_error;
20use super::lister::HdfsNativeLister;
21use super::reader::HdfsNativeReader;
22use super::writer::HdfsNativeWriter;
23use crate::raw::*;
24use crate::services::HdfsNativeConfig;
25use crate::*;
26use hdfs_native::HdfsError;
27use hdfs_native::WriteOptions;
28use log::debug;
29use std::fmt::Debug;
30use std::fmt::Formatter;
31use std::sync::Arc;
32
33impl Configurator for HdfsNativeConfig {
36 type Builder = HdfsNativeBuilder;
37 fn into_builder(self) -> Self::Builder {
38 HdfsNativeBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct HdfsNativeBuilder {
45 config: HdfsNativeConfig,
46}
47
48impl Debug for HdfsNativeBuilder {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("HdfsNativeBuilder")
51 .field("config", &self.config)
52 .finish()
53 }
54}
55
56impl HdfsNativeBuilder {
57 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 pub fn name_node(mut self, name_node: &str) -> Self {
77 if !name_node.is_empty() {
78 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
80 }
81
82 self
83 }
84
85 pub fn enable_append(mut self, enable_append: bool) -> Self {
89 self.config.enable_append = enable_append;
90 self
91 }
92}
93
94impl Builder for HdfsNativeBuilder {
95 const SCHEME: Scheme = Scheme::HdfsNative;
96 type Config = HdfsNativeConfig;
97
98 fn build(self) -> Result<impl Access> {
99 debug!("backend build started: {:?}", &self);
100
101 let name_node = match &self.config.name_node {
102 Some(v) => v,
103 None => {
104 return Err(Error::new(ErrorKind::ConfigInvalid, "name_node is empty")
105 .with_context("service", Scheme::HdfsNative));
106 }
107 };
108
109 let root = normalize_root(&self.config.root.unwrap_or_default());
110 debug!("backend use root {}", root);
111
112 let client = hdfs_native::Client::new(name_node).map_err(parse_hdfs_error)?;
113
114 Ok(HdfsNativeBackend {
116 root,
117 client: Arc::new(client),
118 enable_append: self.config.enable_append,
119 })
120 }
121}
122
123#[derive(Debug, Clone)]
133pub struct HdfsNativeBackend {
134 pub root: String,
135 pub client: Arc<hdfs_native::Client>,
136 enable_append: bool,
137}
138
139unsafe impl Send for HdfsNativeBackend {}
141unsafe impl Sync for HdfsNativeBackend {}
142
143impl Access for HdfsNativeBackend {
144 type Reader = HdfsNativeReader;
145 type Writer = HdfsNativeWriter;
146 type Lister = Option<HdfsNativeLister>;
147 type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
148 type BlockingReader = ();
149 type BlockingWriter = ();
150 type BlockingLister = ();
151 type BlockingDeleter = ();
152
153 fn info(&self) -> Arc<AccessorInfo> {
154 let am = AccessorInfo::default();
155 am.set_scheme(Scheme::HdfsNative)
156 .set_root(&self.root)
157 .set_native_capability(Capability {
158 stat: true,
159 stat_has_last_modified: true,
160 stat_has_content_length: true,
161
162 read: true,
163
164 write: true,
165 write_can_append: self.enable_append,
166
167 create_dir: true,
168 delete: true,
169
170 list: true,
171 list_has_content_length: true,
172 list_has_last_modified: true,
173
174 rename: true,
175
176 shared: true,
177
178 ..Default::default()
179 });
180
181 am.into()
182 }
183
184 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
185 let p = build_rooted_abs_path(&self.root, path);
186
187 self.client
188 .mkdirs(&p, 0o777, true)
189 .await
190 .map_err(parse_hdfs_error)?;
191
192 Ok(RpCreateDir::default())
193 }
194
195 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
196 let p = build_rooted_abs_path(&self.root, path);
197
198 let status: hdfs_native::client::FileStatus = self
199 .client
200 .get_file_info(&p)
201 .await
202 .map_err(parse_hdfs_error)?;
203
204 let mode = if status.isdir {
205 EntryMode::DIR
206 } else {
207 EntryMode::FILE
208 };
209
210 let mut metadata = Metadata::new(mode);
211 metadata
212 .set_last_modified(parse_datetime_from_from_timestamp_millis(
213 status.modification_time as i64,
214 )?)
215 .set_content_length(status.length as u64);
216
217 Ok(RpStat::new(metadata))
218 }
219
220 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
221 let p = build_rooted_abs_path(&self.root, path);
222
223 let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
224
225 let r = HdfsNativeReader::new(
226 f,
227 args.range().offset() as _,
228 args.range().size().unwrap_or(u64::MAX) as _,
229 );
230
231 Ok((RpRead::new(), r))
232 }
233
234 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
235 let target_path = build_rooted_abs_path(&self.root, path);
236 let mut initial_size = 0;
237
238 let target_exists = match self.client.get_file_info(&target_path).await {
239 Ok(status) => {
240 initial_size = status.length as u64;
241 true
242 }
243 Err(err) => match &err {
244 HdfsError::FileNotFound(_) => false,
245 _ => return Err(parse_hdfs_error(err)),
246 },
247 };
248
249 let f = if target_exists {
250 if args.append() {
251 assert!(self.enable_append, "append is not enabled");
252 self.client
253 .append(&target_path)
254 .await
255 .map_err(parse_hdfs_error)?
256 } else {
257 initial_size = 0;
258 self.client
259 .create(&target_path, WriteOptions::default().overwrite(true))
260 .await
261 .map_err(parse_hdfs_error)?
262 }
263 } else {
264 initial_size = 0;
265 self.client
266 .create(&target_path, WriteOptions::default())
267 .await
268 .map_err(parse_hdfs_error)?
269 };
270
271 Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
272 }
273
274 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
275 Ok((
276 RpDelete::default(),
277 oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
278 ))
279 }
280
281 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
282 let p: String = build_rooted_abs_path(&self.root, path);
283
284 let isdir = match self.client.get_file_info(&p).await {
285 Ok(status) => status.isdir,
286 Err(err) => {
287 return match &err {
288 HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
289 _ => Err(parse_hdfs_error(err)),
290 };
291 }
292 };
293 let current_path = if isdir {
294 if !path.ends_with("/") {
295 Some(path.to_string() + "/")
296 } else {
297 Some(path.to_string())
298 }
299 } else {
300 None
301 };
302
303 Ok((
304 RpList::default(),
305 Some(HdfsNativeLister::new(
306 &self.root,
307 &self.client,
308 &p,
309 current_path,
310 )),
311 ))
312 }
313
314 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
315 let from_path = build_rooted_abs_path(&self.root, from);
316 let to_path = build_rooted_abs_path(&self.root, to);
317 match self.client.get_file_info(&to_path).await {
318 Ok(status) => {
319 if status.isdir {
320 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
321 .with_context("input", &to_path));
322 } else {
323 self.client
324 .delete(&to_path, true)
325 .await
326 .map_err(parse_hdfs_error)?;
327 }
328 }
329 Err(err) => match &err {
330 HdfsError::FileNotFound(_) => {
331 self.client
332 .create(&to_path, WriteOptions::default().create_parent(true))
333 .await
334 .map_err(parse_hdfs_error)?;
335 }
336 _ => return Err(parse_hdfs_error(err)),
337 },
338 };
339
340 self.client
341 .rename(&from_path, &to_path, true)
342 .await
343 .map_err(parse_hdfs_error)?;
344
345 Ok(RpRename::default())
346 }
347}