opendal/services/hdfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::io::SeekFrom;
22use std::path::PathBuf;
23use std::sync::Arc;
24
25use log::debug;
26
27use super::delete::HdfsDeleter;
28use super::lister::HdfsLister;
29use super::reader::HdfsReader;
30use super::writer::HdfsWriter;
31use super::DEFAULT_SCHEME;
32use crate::raw::*;
33use crate::services::HdfsConfig;
34use crate::*;
35impl Configurator for HdfsConfig {
36 type Builder = HdfsBuilder;
37 fn into_builder(self) -> Self::Builder {
38 HdfsBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
43#[derive(Default)]
44pub struct HdfsBuilder {
45 config: HdfsConfig,
46}
47
48impl Debug for HdfsBuilder {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("HdfsBuilder")
51 .field("config", &self.config)
52 .finish()
53 }
54}
55
56impl HdfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 pub fn name_node(mut self, name_node: &str) -> Self {
77 if !name_node.is_empty() {
78 self.config.name_node = Some(name_node.to_string())
79 }
80
81 self
82 }
83
84 pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
88 if !kerberos_ticket_cache_path.is_empty() {
89 self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
90 }
91 self
92 }
93
94 pub fn user(mut self, user: &str) -> Self {
96 if !user.is_empty() {
97 self.config.user = Some(user.to_string())
98 }
99 self
100 }
101
102 pub fn enable_append(mut self, enable_append: bool) -> Self {
106 self.config.enable_append = enable_append;
107 self
108 }
109
110 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
117 self.config.atomic_write_dir = if dir.is_empty() {
118 None
119 } else {
120 Some(String::from(dir))
121 };
122 self
123 }
124}
125
126impl Builder for HdfsBuilder {
127 type Config = HdfsConfig;
128
129 fn build(self) -> Result<impl Access> {
130 debug!("backend build started: {:?}", &self);
131
132 let name_node = match &self.config.name_node {
133 Some(v) => v,
134 None => {
135 return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
136 .with_context("service", Scheme::Hdfs))
137 }
138 };
139
140 let root = normalize_root(&self.config.root.unwrap_or_default());
141 debug!("backend use root {root}");
142
143 let mut builder = hdrs::ClientBuilder::new(name_node);
144 if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
145 builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
146 }
147 if let Some(user) = &self.config.user {
148 builder = builder.with_user(user.as_str());
149 }
150
151 let client = builder.connect().map_err(new_std_io_error)?;
152
153 if let Err(e) = client.metadata(&root) {
155 if e.kind() == io::ErrorKind::NotFound {
156 debug!("root {root} is not exist, creating now");
157
158 client.create_dir(&root).map_err(new_std_io_error)?
159 }
160 }
161
162 let atomic_write_dir = self.config.atomic_write_dir;
163
164 if let Some(d) = &atomic_write_dir {
166 if let Err(e) = client.metadata(d) {
167 if e.kind() == io::ErrorKind::NotFound {
168 client.create_dir(d).map_err(new_std_io_error)?
169 }
170 }
171 }
172
173 Ok(HdfsBackend {
174 info: {
175 let am = AccessorInfo::default();
176 am.set_scheme(DEFAULT_SCHEME)
177 .set_root(&root)
178 .set_native_capability(Capability {
179 stat: true,
180
181 read: true,
182
183 write: true,
184 write_can_append: self.config.enable_append,
185
186 create_dir: true,
187 delete: true,
188
189 list: true,
190
191 rename: true,
192
193 shared: true,
194
195 ..Default::default()
196 });
197
198 am.into()
199 },
200 root,
201 atomic_write_dir,
202 client: Arc::new(client),
203 })
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct HdfsBackend {
210 pub info: Arc<AccessorInfo>,
211 pub root: String,
212 atomic_write_dir: Option<String>,
213 pub client: Arc<hdrs::Client>,
214}
215
216unsafe impl Send for HdfsBackend {}
218unsafe impl Sync for HdfsBackend {}
219
220impl Access for HdfsBackend {
221 type Reader = HdfsReader<hdrs::AsyncFile>;
222 type Writer = HdfsWriter<hdrs::AsyncFile>;
223 type Lister = Option<HdfsLister>;
224 type Deleter = oio::OneShotDeleter<HdfsDeleter>;
225
226 fn info(&self) -> Arc<AccessorInfo> {
227 self.info.clone()
228 }
229
230 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
231 let p = build_rooted_abs_path(&self.root, path);
232
233 self.client.create_dir(&p).map_err(new_std_io_error)?;
234
235 Ok(RpCreateDir::default())
236 }
237
238 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
239 let p = build_rooted_abs_path(&self.root, path);
240
241 let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
242
243 let mode = if meta.is_dir() {
244 EntryMode::DIR
245 } else if meta.is_file() {
246 EntryMode::FILE
247 } else {
248 EntryMode::Unknown
249 };
250 let mut m = Metadata::new(mode);
251 m.set_content_length(meta.len());
252 m.set_last_modified(meta.modified().into());
253
254 Ok(RpStat::new(m))
255 }
256
257 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
258 let p = build_rooted_abs_path(&self.root, path);
259
260 let client = self.client.clone();
261 let mut f = client
262 .open_file()
263 .read(true)
264 .async_open(&p)
265 .await
266 .map_err(new_std_io_error)?;
267
268 if args.range().offset() != 0 {
269 use futures::AsyncSeekExt;
270
271 f.seek(SeekFrom::Start(args.range().offset()))
272 .await
273 .map_err(new_std_io_error)?;
274 }
275
276 Ok((
277 RpRead::new(),
278 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
279 ))
280 }
281
282 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
283 let target_path = build_rooted_abs_path(&self.root, path);
284 let mut initial_size = 0;
285 let target_exists = match self.client.metadata(&target_path) {
286 Ok(meta) => {
287 initial_size = meta.len();
288 true
289 }
290 Err(err) => {
291 if err.kind() != io::ErrorKind::NotFound {
292 return Err(new_std_io_error(err));
293 }
294 false
295 }
296 };
297
298 let should_append = op.append() && target_exists;
299 let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
300 (!should_append).then_some(build_rooted_abs_path(
302 atomic_write_dir,
303 &build_tmp_path_of(path),
304 ))
305 });
306
307 if !target_exists {
308 let parent = get_parent(&target_path);
309 self.client.create_dir(parent).map_err(new_std_io_error)?;
310 }
311 if !should_append {
312 initial_size = 0;
313 }
314
315 let mut open_options = self.client.open_file();
316 open_options.create(true);
317 if should_append {
318 open_options.append(true);
319 } else {
320 open_options.write(true);
321 }
322
323 let f = open_options
324 .async_open(tmp_path.as_ref().unwrap_or(&target_path))
325 .await
326 .map_err(new_std_io_error)?;
327
328 Ok((
329 RpWrite::new(),
330 HdfsWriter::new(
331 target_path,
332 tmp_path,
333 f,
334 Arc::clone(&self.client),
335 target_exists,
336 initial_size,
337 ),
338 ))
339 }
340
341 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
342 Ok((
343 RpDelete::default(),
344 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
345 ))
346 }
347
348 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
349 let p = build_rooted_abs_path(&self.root, path);
350
351 let f = match self.client.read_dir(&p) {
352 Ok(f) => f,
353 Err(e) => {
354 return if e.kind() == io::ErrorKind::NotFound {
355 Ok((RpList::default(), None))
356 } else {
357 Err(new_std_io_error(e))
358 }
359 }
360 };
361
362 let rd = HdfsLister::new(&self.root, f, path);
363
364 Ok((RpList::default(), Some(rd)))
365 }
366
367 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
368 let from_path = build_rooted_abs_path(&self.root, from);
369 self.client.metadata(&from_path).map_err(new_std_io_error)?;
370
371 let to_path = build_rooted_abs_path(&self.root, to);
372 let result = self.client.metadata(&to_path);
373 match result {
374 Err(err) => {
375 if err.kind() != io::ErrorKind::NotFound {
377 return Err(new_std_io_error(err));
378 }
379
380 let parent = PathBuf::from(&to_path)
381 .parent()
382 .ok_or_else(|| {
383 Error::new(
384 ErrorKind::Unexpected,
385 "path should have parent but not, it must be malformed",
386 )
387 .with_context("input", &to_path)
388 })?
389 .to_path_buf();
390
391 self.client
392 .create_dir(&parent.to_string_lossy())
393 .map_err(new_std_io_error)?;
394 }
395 Ok(metadata) => {
396 if metadata.is_file() {
397 self.client
398 .remove_file(&to_path)
399 .map_err(new_std_io_error)?;
400 } else {
401 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
402 .with_context("input", &to_path));
403 }
404 }
405 }
406
407 self.client
408 .rename_file(&from_path, &to_path)
409 .map_err(new_std_io_error)?;
410
411 Ok(RpRename::new())
412 }
413}