opendal/services/hdfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::io::SeekFrom;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25use log::debug;
26
27use super::delete::HdfsDeleter;
28use super::lister::HdfsLister;
29use super::reader::HdfsReader;
30use super::writer::HdfsWriter;
31use super::DEFAULT_SCHEME;
32use crate::raw::*;
33use crate::services::HdfsConfig;
34use crate::*;
35impl Configurator for HdfsConfig {
36 type Builder = HdfsBuilder;
37 fn into_builder(self) -> Self::Builder {
38 HdfsBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct HdfsBuilder {
45 config: HdfsConfig,
46}
47
48impl Debug for HdfsBuilder {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("HdfsBuilder")
51 .field("config", &self.config)
52 .finish()
53 }
54}
55
56impl HdfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 pub fn name_node(mut self, name_node: &str) -> Self {
77 if !name_node.is_empty() {
78 self.config.name_node = Some(name_node.trim_end_matches('/').to_string())
80 }
81
82 self
83 }
84
85 pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
89 if !kerberos_ticket_cache_path.is_empty() {
90 self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
91 }
92 self
93 }
94
95 pub fn user(mut self, user: &str) -> Self {
97 if !user.is_empty() {
98 self.config.user = Some(user.to_string())
99 }
100 self
101 }
102
103 pub fn enable_append(mut self, enable_append: bool) -> Self {
107 self.config.enable_append = enable_append;
108 self
109 }
110
111 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
118 self.config.atomic_write_dir = if dir.is_empty() {
119 None
120 } else {
121 Some(String::from(dir))
122 };
123 self
124 }
125}
126
127impl Builder for HdfsBuilder {
128 type Config = HdfsConfig;
129
130 fn build(self) -> Result<impl Access> {
131 debug!("backend build started: {:?}", &self);
132
133 let name_node = match &self.config.name_node {
134 Some(v) => v,
135 None => {
136 return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
137 .with_context("service", Scheme::Hdfs))
138 }
139 };
140
141 let root = normalize_root(&self.config.root.unwrap_or_default());
142 debug!("backend use root {root}");
143
144 let mut builder = hdrs::ClientBuilder::new(name_node);
145 if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
146 builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
147 }
148 if let Some(user) = &self.config.user {
149 builder = builder.with_user(user.as_str());
150 }
151
152 let client = builder.connect().map_err(new_std_io_error)?;
153
154 if let Err(e) = client.metadata(&root) {
156 if e.kind() == io::ErrorKind::NotFound {
157 debug!("root {root} is not exist, creating now");
158
159 client.create_dir(&root).map_err(new_std_io_error)?
160 }
161 }
162
163 let atomic_write_dir = self.config.atomic_write_dir;
164
165 if let Some(d) = &atomic_write_dir {
167 if let Err(e) = client.metadata(d) {
168 if e.kind() == io::ErrorKind::NotFound {
169 client.create_dir(d).map_err(new_std_io_error)?
170 }
171 }
172 }
173
174 Ok(HdfsBackend {
175 info: {
176 let am = AccessorInfo::default();
177 am.set_scheme(DEFAULT_SCHEME)
178 .set_root(&root)
179 .set_native_capability(Capability {
180 stat: true,
181
182 read: true,
183
184 write: true,
185 write_can_append: self.config.enable_append,
186
187 create_dir: true,
188 delete: true,
189
190 list: true,
191
192 rename: true,
193
194 shared: true,
195
196 ..Default::default()
197 });
198
199 am.into()
200 },
201 root,
202 atomic_write_dir,
203 client: Arc::new(client),
204 })
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct HdfsBackend {
211 pub info: Arc<AccessorInfo>,
212 pub root: String,
213 atomic_write_dir: Option<String>,
214 pub client: Arc<hdrs::Client>,
215}
216
217unsafe impl Send for HdfsBackend {}
219unsafe impl Sync for HdfsBackend {}
220
221impl Access for HdfsBackend {
222 type Reader = HdfsReader<hdrs::AsyncFile>;
223 type Writer = HdfsWriter<hdrs::AsyncFile>;
224 type Lister = Option<HdfsLister>;
225 type Deleter = oio::OneShotDeleter<HdfsDeleter>;
226
227 fn info(&self) -> Arc<AccessorInfo> {
228 self.info.clone()
229 }
230
231 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
232 let p = build_rooted_abs_path(&self.root, path);
233
234 self.client.create_dir(&p).map_err(new_std_io_error)?;
235
236 Ok(RpCreateDir::default())
237 }
238
239 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
240 let p = build_rooted_abs_path(&self.root, path);
241
242 let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
243
244 let mode = if meta.is_dir() {
245 EntryMode::DIR
246 } else if meta.is_file() {
247 EntryMode::FILE
248 } else {
249 EntryMode::Unknown
250 };
251 let mut m = Metadata::new(mode);
252 m.set_content_length(meta.len());
253 m.set_last_modified(meta.modified().into());
254
255 Ok(RpStat::new(m))
256 }
257
258 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
259 let p = build_rooted_abs_path(&self.root, path);
260
261 let client = self.client.clone();
262 let mut f = client
263 .open_file()
264 .read(true)
265 .async_open(&p)
266 .await
267 .map_err(new_std_io_error)?;
268
269 if args.range().offset() != 0 {
270 use futures::AsyncSeekExt;
271
272 f.seek(SeekFrom::Start(args.range().offset()))
273 .await
274 .map_err(new_std_io_error)?;
275 }
276
277 Ok((
278 RpRead::new(),
279 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
280 ))
281 }
282
283 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
284 let target_path = build_rooted_abs_path(&self.root, path);
285 let mut initial_size = 0;
286 let target_exists = match self.client.metadata(&target_path) {
287 Ok(meta) => {
288 initial_size = meta.len();
289 true
290 }
291 Err(err) => {
292 if err.kind() != io::ErrorKind::NotFound {
293 return Err(new_std_io_error(err));
294 }
295 false
296 }
297 };
298
299 let should_append = op.append() && target_exists;
300 let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
301 (!should_append).then_some(build_rooted_abs_path(
303 atomic_write_dir,
304 &build_tmp_path_of(path),
305 ))
306 });
307
308 if !target_exists {
309 let parent = get_parent(&target_path);
310 self.client.create_dir(parent).map_err(new_std_io_error)?;
311 }
312 if !should_append {
313 initial_size = 0;
314 }
315
316 let mut open_options = self.client.open_file();
317 open_options.create(true);
318 if should_append {
319 open_options.append(true);
320 } else {
321 open_options.write(true);
322 }
323
324 let f = open_options
325 .async_open(tmp_path.as_ref().unwrap_or(&target_path))
326 .await
327 .map_err(new_std_io_error)?;
328
329 Ok((
330 RpWrite::new(),
331 HdfsWriter::new(
332 target_path,
333 tmp_path,
334 f,
335 Arc::clone(&self.client),
336 target_exists,
337 initial_size,
338 ),
339 ))
340 }
341
342 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
343 Ok((
344 RpDelete::default(),
345 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
346 ))
347 }
348
349 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
350 let p = build_rooted_abs_path(&self.root, path);
351
352 let f = match self.client.read_dir(&p) {
353 Ok(f) => f,
354 Err(e) => {
355 return if e.kind() == io::ErrorKind::NotFound {
356 Ok((RpList::default(), None))
357 } else {
358 Err(new_std_io_error(e))
359 }
360 }
361 };
362
363 let rd = HdfsLister::new(&self.root, f, path);
364
365 Ok((RpList::default(), Some(rd)))
366 }
367
368 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
369 let from_path = build_rooted_abs_path(&self.root, from);
370 self.client.metadata(&from_path).map_err(new_std_io_error)?;
371
372 let to_path = build_rooted_abs_path(&self.root, to);
373 let result = self.client.metadata(&to_path);
374 match result {
375 Err(err) => {
376 if err.kind() != io::ErrorKind::NotFound {
378 return Err(new_std_io_error(err));
379 }
380
381 let parent = PathBuf::from(&to_path)
382 .parent()
383 .ok_or_else(|| {
384 Error::new(
385 ErrorKind::Unexpected,
386 "path should have parent but not, it must be malformed",
387 )
388 .with_context("input", &to_path)
389 })?
390 .to_path_buf();
391
392 self.client
393 .create_dir(&parent.to_string_lossy())
394 .map_err(new_std_io_error)?;
395 }
396 Ok(metadata) => {
397 if metadata.is_file() {
398 self.client
399 .remove_file(&to_path)
400 .map_err(new_std_io_error)?;
401 } else {
402 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
403 .with_context("input", &to_path));
404 }
405 }
406 }
407
408 self.client
409 .rename_file(&from_path, &to_path)
410 .map_err(new_std_io_error)?;
411
412 Ok(RpRename::new())
413 }
414}