opendal/services/webhdfs/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use tokio::sync::OnceCell;
26
27use super::WEBHDFS_SCHEME;
28use super::config::WebhdfsConfig;
29use super::core::WebhdfsCore;
30use super::deleter::WebhdfsDeleter;
31use super::error::parse_error;
32use super::lister::WebhdfsLister;
33use super::message::BooleanResp;
34use super::message::FileStatusType;
35use super::message::FileStatusWrapper;
36use super::writer::WebhdfsWriter;
37use super::writer::WebhdfsWriters;
38use crate::raw::*;
39use crate::*;
40
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43#[doc = include_str!("docs.md")]
45#[derive(Debug, Default)]
46pub struct WebhdfsBuilder {
47 pub(super) config: WebhdfsConfig,
48}
49
50impl WebhdfsBuilder {
51 pub fn root(mut self, root: &str) -> Self {
59 self.config.root = if root.is_empty() {
60 None
61 } else {
62 Some(root.to_string())
63 };
64
65 self
66 }
67
68 pub fn endpoint(mut self, endpoint: &str) -> Self {
79 if !endpoint.is_empty() {
80 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
82 }
83 self
84 }
85
86 pub fn user_name(mut self, user_name: &str) -> Self {
89 if !user_name.is_empty() {
90 self.config.user_name = Some(user_name.to_string());
91 }
92 self
93 }
94
95 pub fn delegation(mut self, delegation: &str) -> Self {
102 if !delegation.is_empty() {
103 self.config.delegation = Some(delegation.to_string());
104 }
105 self
106 }
107
108 pub fn disable_list_batch(mut self) -> Self {
115 self.config.disable_list_batch = true;
116 self
117 }
118
119 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
125 self.config.atomic_write_dir = if dir.is_empty() {
126 None
127 } else {
128 Some(String::from(dir))
129 };
130 self
131 }
132}
133
134impl Builder for WebhdfsBuilder {
135 type Config = WebhdfsConfig;
136
137 fn build(self) -> Result<impl Access> {
145 debug!("start building backend: {self:?}");
146
147 let root = normalize_root(&self.config.root.unwrap_or_default());
148 debug!("backend use root {root}");
149
150 let endpoint = match self.config.endpoint {
152 Some(endpoint) => {
153 if endpoint.starts_with("http") {
154 endpoint
155 } else {
156 format!("http://{endpoint}")
157 }
158 }
159 None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
160 };
161 debug!("backend use endpoint {endpoint}");
162
163 let atomic_write_dir = self.config.atomic_write_dir;
164
165 let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
166
167 let info = AccessorInfo::default();
168 info.set_scheme(WEBHDFS_SCHEME)
169 .set_root(&root)
170 .set_native_capability(Capability {
171 stat: true,
172
173 read: true,
174
175 write: true,
176 write_can_append: true,
177 write_can_multi: atomic_write_dir.is_some(),
178
179 create_dir: true,
180 delete: true,
181
182 list: true,
183
184 shared: true,
185
186 ..Default::default()
187 });
188
189 let accessor_info = Arc::new(info);
190 let core = Arc::new(WebhdfsCore {
191 info: accessor_info,
192 root,
193 endpoint,
194 user_name: self.config.user_name,
195 auth,
196 root_checker: OnceCell::new(),
197 atomic_write_dir,
198 disable_list_batch: self.config.disable_list_batch,
199 });
200
201 Ok(WebhdfsBackend { core })
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct WebhdfsBackend {
208 core: Arc<WebhdfsCore>,
209}
210
211impl WebhdfsBackend {
212 async fn check_root(&self) -> Result<()> {
213 let resp = self.core.webhdfs_get_file_status("/").await?;
214 match resp.status() {
215 StatusCode::OK => {
216 let bs = resp.into_body();
217
218 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
219 .map_err(new_json_deserialize_error)?
220 .file_status;
221
222 if file_status.ty == FileStatusType::File {
223 return Err(Error::new(
224 ErrorKind::ConfigInvalid,
225 "root path must be dir",
226 ));
227 }
228 }
229 StatusCode::NOT_FOUND => {
230 self.create_dir("/", OpCreateDir::new()).await?;
231 }
232 _ => return Err(parse_error(resp)),
233 }
234 Ok(())
235 }
236}
237
238impl Access for WebhdfsBackend {
239 type Reader = HttpBody;
240 type Writer = WebhdfsWriters;
241 type Lister = oio::PageLister<WebhdfsLister>;
242 type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
243
244 fn info(&self) -> Arc<AccessorInfo> {
245 self.core.info.clone()
246 }
247
248 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
250 let resp = self.core.webhdfs_create_dir(path).await?;
251
252 let status = resp.status();
253 match status {
258 StatusCode::CREATED | StatusCode::OK => {
259 let bs = resp.into_body();
260
261 let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
262 .map_err(new_json_deserialize_error)?;
263
264 if resp.boolean {
265 Ok(RpCreateDir::default())
266 } else {
267 Err(Error::new(
268 ErrorKind::Unexpected,
269 "webhdfs create dir failed",
270 ))
271 }
272 }
273 _ => Err(parse_error(resp)),
274 }
275 }
276
277 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
278 self.core
280 .root_checker
281 .get_or_try_init(|| async { self.check_root().await })
282 .await?;
283
284 let resp = self.core.webhdfs_get_file_status(path).await?;
285 let status = resp.status();
286 match status {
287 StatusCode::OK => {
288 let bs = resp.into_body();
289
290 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
291 .map_err(new_json_deserialize_error)?
292 .file_status;
293
294 let meta = match file_status.ty {
295 FileStatusType::Directory => Metadata::new(EntryMode::DIR),
296 FileStatusType::File => Metadata::new(EntryMode::FILE)
297 .with_content_length(file_status.length)
298 .with_last_modified(Timestamp::from_millisecond(
299 file_status.modification_time,
300 )?),
301 };
302
303 Ok(RpStat::new(meta))
304 }
305
306 _ => Err(parse_error(resp)),
307 }
308 }
309
310 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
311 let resp = self.core.webhdfs_read_file(path, args.range()).await?;
312
313 let status = resp.status();
314 match status {
315 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
316 Ok((RpRead::default(), resp.into_body()))
317 }
318 _ => {
319 let (part, mut body) = resp.into_parts();
320 let buf = body.to_buffer().await?;
321 Err(parse_error(Response::from_parts(part, buf)))
322 }
323 }
324 }
325
326 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
327 let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
328
329 let w = if args.append() {
330 WebhdfsWriters::Two(oio::AppendWriter::new(w))
331 } else {
332 WebhdfsWriters::One(oio::BlockWriter::new(
333 self.info().clone(),
334 w,
335 args.concurrent(),
336 ))
337 };
338
339 Ok((RpWrite::default(), w))
340 }
341
342 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
343 Ok((
344 RpDelete::default(),
345 oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
346 ))
347 }
348
349 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
350 if args.recursive() {
351 return Err(Error::new(
352 ErrorKind::Unsupported,
353 "WebHDFS doesn't support list with recursive",
354 ));
355 }
356
357 let path = path.trim_end_matches('/');
358 let l = WebhdfsLister::new(self.core.clone(), path);
359 Ok((RpList::default(), oio::PageLister::new(l)))
360 }
361}