opendal_core/services/hdfs/
core.rs1use std::fmt::Debug;
19use std::io;
20use std::io::SeekFrom;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26#[derive(Clone)]
28pub struct HdfsCore {
29 pub info: Arc<AccessorInfo>,
30 pub root: String,
31 pub atomic_write_dir: Option<String>,
32 pub client: Arc<hdrs::Client>,
33}
34
35impl Debug for HdfsCore {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("HdfsCore")
38 .field("root", &self.root)
39 .field("atomic_write_dir", &self.atomic_write_dir)
40 .finish_non_exhaustive()
41 }
42}
43
44impl HdfsCore {
45 pub fn hdfs_create_dir(&self, path: &str) -> Result<()> {
46 let p = build_rooted_abs_path(&self.root, path);
47 self.client.create_dir(&p).map_err(new_std_io_error)?;
48 Ok(())
49 }
50
51 pub fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
52 let p = build_rooted_abs_path(&self.root, path);
53 let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
54
55 let mode = if meta.is_dir() {
56 EntryMode::DIR
57 } else if meta.is_file() {
58 EntryMode::FILE
59 } else {
60 EntryMode::Unknown
61 };
62 let mut m = Metadata::new(mode);
63 m.set_content_length(meta.len());
64 m.set_last_modified(Timestamp::try_from(meta.modified())?);
65
66 Ok(m)
67 }
68
69 pub async fn hdfs_read(&self, path: &str, args: &OpRead) -> Result<hdrs::AsyncFile> {
70 let p = build_rooted_abs_path(&self.root, path);
71
72 let client = self.client.clone();
73 let mut f = client
74 .open_file()
75 .read(true)
76 .async_open(&p)
77 .await
78 .map_err(new_std_io_error)?;
79
80 if args.range().offset() != 0 {
81 use futures::AsyncSeekExt;
82
83 f.seek(SeekFrom::Start(args.range().offset()))
84 .await
85 .map_err(new_std_io_error)?;
86 }
87
88 Ok(f)
89 }
90
91 pub async fn hdfs_write(
92 &self,
93 path: &str,
94 op: &OpWrite,
95 ) -> Result<(String, Option<String>, hdrs::AsyncFile, bool, u64)> {
96 let target_path = build_rooted_abs_path(&self.root, path);
97 let mut initial_size = 0;
98 let target_exists = match self.client.metadata(&target_path) {
99 Ok(meta) => {
100 initial_size = meta.len();
101 true
102 }
103 Err(err) => {
104 if err.kind() != io::ErrorKind::NotFound {
105 return Err(new_std_io_error(err));
106 }
107 false
108 }
109 };
110
111 let should_append = op.append() && target_exists;
112 let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
113 (!should_append).then_some(build_rooted_abs_path(
115 atomic_write_dir,
116 &build_tmp_path_of(path),
117 ))
118 });
119
120 if !target_exists {
121 let parent = get_parent(&target_path);
122 self.client.create_dir(parent).map_err(new_std_io_error)?;
123 }
124 if !should_append {
125 initial_size = 0;
126 }
127
128 let mut open_options = self.client.open_file();
129 open_options.create(true);
130 if should_append {
131 open_options.append(true);
132 } else {
133 open_options.write(true);
134 }
135
136 let f = open_options
137 .async_open(tmp_path.as_ref().unwrap_or(&target_path))
138 .await
139 .map_err(new_std_io_error)?;
140
141 Ok((target_path, tmp_path, f, target_exists, initial_size))
142 }
143
144 pub fn hdfs_list(&self, path: &str) -> Result<Option<hdrs::Readdir>> {
145 let p = build_rooted_abs_path(&self.root, path);
146
147 match self.client.read_dir(&p) {
148 Ok(f) => Ok(Some(f)),
149 Err(e) => {
150 if e.kind() == io::ErrorKind::NotFound {
151 Ok(None)
152 } else {
153 Err(new_std_io_error(e))
154 }
155 }
156 }
157 }
158
159 pub fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
160 let from_path = build_rooted_abs_path(&self.root, from);
161 self.client.metadata(&from_path).map_err(new_std_io_error)?;
162
163 let to_path = build_rooted_abs_path(&self.root, to);
164 let result = self.client.metadata(&to_path);
165 match result {
166 Err(err) => {
167 if err.kind() != io::ErrorKind::NotFound {
169 return Err(new_std_io_error(err));
170 }
171
172 let parent = std::path::PathBuf::from(&to_path)
173 .parent()
174 .ok_or_else(|| {
175 Error::new(
176 ErrorKind::Unexpected,
177 "path should have parent but not, it must be malformed",
178 )
179 .with_context("input", &to_path)
180 })?
181 .to_path_buf();
182
183 self.client
184 .create_dir(&parent.to_string_lossy())
185 .map_err(new_std_io_error)?;
186 }
187 Ok(metadata) => {
188 if metadata.is_file() {
189 self.client
190 .remove_file(&to_path)
191 .map_err(new_std_io_error)?;
192 } else {
193 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
194 .with_context("input", &to_path));
195 }
196 }
197 }
198
199 self.client
200 .rename_file(&from_path, &to_path)
201 .map_err(new_std_io_error)?;
202
203 Ok(())
204 }
205}