opendal/services/webhdfs/
backend.rs1use std::fmt::Formatter;
19use std::sync::Arc;
20
21use bytes::Buf;
22use core::fmt::Debug;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use tokio::sync::OnceCell;
27
28use super::core::WebhdfsCore;
29use super::delete::WebhdfsDeleter;
30use super::error::parse_error;
31use super::lister::WebhdfsLister;
32use super::message::BooleanResp;
33use super::message::FileStatusType;
34use super::message::FileStatusWrapper;
35use super::writer::WebhdfsWriter;
36use super::writer::WebhdfsWriters;
37use crate::raw::*;
38use crate::services::WebhdfsConfig;
39use crate::*;
40
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43impl Configurator for WebhdfsConfig {
44 type Builder = WebhdfsBuilder;
45 fn into_builder(self) -> Self::Builder {
46 WebhdfsBuilder { config: self }
47 }
48}
49
50#[doc = include_str!("docs.md")]
52#[derive(Default, Clone)]
53pub struct WebhdfsBuilder {
54 config: WebhdfsConfig,
55}
56
57impl Debug for WebhdfsBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 let mut d = f.debug_struct("WebhdfsBuilder");
60 d.field("config", &self.config);
61 d.finish_non_exhaustive()
62 }
63}
64
65impl WebhdfsBuilder {
66 pub fn root(mut self, root: &str) -> Self {
74 self.config.root = if root.is_empty() {
75 None
76 } else {
77 Some(root.to_string())
78 };
79
80 self
81 }
82
83 pub fn endpoint(mut self, endpoint: &str) -> Self {
94 if !endpoint.is_empty() {
95 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
97 }
98 self
99 }
100
101 pub fn user_name(mut self, user_name: &str) -> Self {
105 if !user_name.is_empty() {
106 self.config.user_name = Some(user_name.to_string());
107 }
108 self
109 }
110
111 pub fn delegation(mut self, delegation: &str) -> Self {
118 if !delegation.is_empty() {
119 self.config.delegation = Some(delegation.to_string());
120 }
121 self
122 }
123
124 pub fn disable_list_batch(mut self) -> Self {
131 self.config.disable_list_batch = true;
132 self
133 }
134
135 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
141 self.config.atomic_write_dir = if dir.is_empty() {
142 None
143 } else {
144 Some(String::from(dir))
145 };
146 self
147 }
148}
149
150impl Builder for WebhdfsBuilder {
151 const SCHEME: Scheme = Scheme::Webhdfs;
152 type Config = WebhdfsConfig;
153
154 fn build(self) -> Result<impl Access> {
162 debug!("start building backend: {:?}", self);
163
164 let root = normalize_root(&self.config.root.unwrap_or_default());
165 debug!("backend use root {root}");
166
167 let endpoint = match self.config.endpoint {
169 Some(endpoint) => {
170 if endpoint.starts_with("http") {
171 endpoint
172 } else {
173 format!("http://{endpoint}")
174 }
175 }
176 None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
177 };
178 debug!("backend use endpoint {}", endpoint);
179
180 let atomic_write_dir = self.config.atomic_write_dir;
181
182 let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
183
184 let info = AccessorInfo::default();
185 info.set_scheme(Scheme::Webhdfs)
186 .set_root(&root)
187 .set_native_capability(Capability {
188 stat: true,
189 stat_has_content_length: true,
190 stat_has_last_modified: true,
191
192 read: true,
193
194 write: true,
195 write_can_append: true,
196 write_can_multi: atomic_write_dir.is_some(),
197
198 create_dir: true,
199 delete: true,
200
201 list: true,
202 list_has_content_length: true,
203 list_has_last_modified: true,
204
205 shared: true,
206
207 ..Default::default()
208 });
209
210 let accessor_info = Arc::new(info);
211 let core = Arc::new(WebhdfsCore {
212 info: accessor_info,
213 root,
214 endpoint,
215 user_name: self.config.user_name,
216 auth,
217 root_checker: OnceCell::new(),
218 atomic_write_dir,
219 disable_list_batch: self.config.disable_list_batch,
220 });
221
222 Ok(WebhdfsBackend { core })
223 }
224}
225
226#[derive(Debug, Clone)]
228pub struct WebhdfsBackend {
229 core: Arc<WebhdfsCore>,
230}
231
232impl WebhdfsBackend {
233 async fn check_root(&self) -> Result<()> {
234 let resp = self.core.webhdfs_get_file_status("/").await?;
235 match resp.status() {
236 StatusCode::OK => {
237 let bs = resp.into_body();
238
239 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
240 .map_err(new_json_deserialize_error)?
241 .file_status;
242
243 if file_status.ty == FileStatusType::File {
244 return Err(Error::new(
245 ErrorKind::ConfigInvalid,
246 "root path must be dir",
247 ));
248 }
249 }
250 StatusCode::NOT_FOUND => {
251 self.create_dir("/", OpCreateDir::new()).await?;
252 }
253 _ => return Err(parse_error(resp)),
254 }
255 Ok(())
256 }
257}
258
259impl Access for WebhdfsBackend {
260 type Reader = HttpBody;
261 type Writer = WebhdfsWriters;
262 type Lister = oio::PageLister<WebhdfsLister>;
263 type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
264 type BlockingReader = ();
265 type BlockingWriter = ();
266 type BlockingLister = ();
267 type BlockingDeleter = ();
268
269 fn info(&self) -> Arc<AccessorInfo> {
270 self.core.info.clone()
271 }
272
273 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
275 let req = self.core.webhdfs_create_dir_request(path)?;
276
277 let resp = self.info().http_client().send(req).await?;
278
279 let status = resp.status();
280
281 match status {
286 StatusCode::CREATED | StatusCode::OK => {
287 let bs = resp.into_body();
288
289 let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
290 .map_err(new_json_deserialize_error)?;
291
292 if resp.boolean {
293 Ok(RpCreateDir::default())
294 } else {
295 Err(Error::new(
296 ErrorKind::Unexpected,
297 "webhdfs create dir failed",
298 ))
299 }
300 }
301 _ => Err(parse_error(resp)),
302 }
303 }
304
305 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
306 self.core
308 .root_checker
309 .get_or_try_init(|| async { self.check_root().await })
310 .await?;
311
312 let resp = self.core.webhdfs_get_file_status(path).await?;
313 let status = resp.status();
314 match status {
315 StatusCode::OK => {
316 let bs = resp.into_body();
317
318 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
319 .map_err(new_json_deserialize_error)?
320 .file_status;
321
322 let meta = match file_status.ty {
323 FileStatusType::Directory => Metadata::new(EntryMode::DIR),
324 FileStatusType::File => Metadata::new(EntryMode::FILE)
325 .with_content_length(file_status.length)
326 .with_last_modified(parse_datetime_from_from_timestamp_millis(
327 file_status.modification_time,
328 )?),
329 };
330
331 Ok(RpStat::new(meta))
332 }
333
334 _ => Err(parse_error(resp)),
335 }
336 }
337
338 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
339 let resp = self.core.webhdfs_read_file(path, args.range()).await?;
340
341 let status = resp.status();
342
343 match status {
344 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
345 Ok((RpRead::default(), resp.into_body()))
346 }
347 _ => {
348 let (part, mut body) = resp.into_parts();
349 let buf = body.to_buffer().await?;
350 Err(parse_error(Response::from_parts(part, buf)))
351 }
352 }
353 }
354
355 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
356 let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
357
358 let w = if args.append() {
359 WebhdfsWriters::Two(oio::AppendWriter::new(w))
360 } else {
361 WebhdfsWriters::One(oio::BlockWriter::new(
362 self.info().clone(),
363 w,
364 args.concurrent(),
365 ))
366 };
367
368 Ok((RpWrite::default(), w))
369 }
370
371 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
372 Ok((
373 RpDelete::default(),
374 oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
375 ))
376 }
377
378 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
379 if args.recursive() {
380 return Err(Error::new(
381 ErrorKind::Unsupported,
382 "WebHDFS doesn't support list with recursive",
383 ));
384 }
385
386 let path = path.trim_end_matches('/');
387 let l = WebhdfsLister::new(self.core.clone(), path);
388 Ok((RpList::default(), oio::PageLister::new(l)))
389 }
390}