opendal/services/webhdfs/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use http::header::CONTENT_LENGTH;
26use http::header::CONTENT_TYPE;
27use serde::Deserialize;
28use tokio::sync::OnceCell;
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34pub struct WebhdfsCore {
35 pub info: Arc<AccessorInfo>,
36 pub root: String,
37 pub endpoint: String,
38 pub user_name: Option<String>,
39 pub auth: Option<String>,
40 pub root_checker: OnceCell<()>,
41
42 pub atomic_write_dir: Option<String>,
43 pub disable_list_batch: bool,
44}
45
46impl Debug for WebhdfsCore {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("WebhdfsCore")
49 .field("root", &self.root)
50 .field("endpoint", &self.endpoint)
51 .finish_non_exhaustive()
52 }
53}
54
55impl WebhdfsCore {
56 pub async fn webhdfs_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
57 let p = build_abs_path(&self.root, path);
58
59 let mut url = format!(
60 "{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true",
61 self.endpoint,
62 percent_encode_path(&p),
63 );
64 if let Some(user) = &self.user_name {
65 url += format!("&user.name={user}").as_str();
66 }
67 if let Some(auth) = &self.auth {
68 url += format!("&{auth}").as_str();
69 }
70
71 let req = Request::put(&url);
72
73 let req = req
74 .extension(Operation::CreateDir)
75 .body(Buffer::new())
76 .map_err(new_request_build_error)?;
77
78 self.info.http_client().send(req).await
79 }
80
81 pub async fn webhdfs_create_object(
83 &self,
84 path: &str,
85 size: Option<u64>,
86 args: &OpWrite,
87 body: Buffer,
88 ) -> Result<Response<Buffer>> {
89 let p = build_abs_path(&self.root, path);
90
91 let mut url = format!(
92 "{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true",
93 self.endpoint,
94 percent_encode_path(&p),
95 );
96 if let Some(user) = &self.user_name {
97 url += format!("&user.name={user}").as_str();
98 }
99 if let Some(auth) = &self.auth {
100 url += format!("&{auth}").as_str();
101 }
102
103 let req = Request::put(&url);
104
105 let req = req
106 .extension(Operation::Write)
107 .body(Buffer::new())
108 .map_err(new_request_build_error)?;
109
110 let resp = self.info.http_client().send(req).await?;
111
112 let status = resp.status();
113
114 if status != StatusCode::CREATED && status != StatusCode::OK {
115 return Err(parse_error(resp));
116 }
117
118 let bs = resp.into_body();
119
120 let resp: LocationResponse =
121 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
122
123 let mut req = Request::put(&resp.location);
124
125 if let Some(size) = size {
126 req = req.header(CONTENT_LENGTH, size);
127 };
128
129 if let Some(content_type) = args.content_type() {
130 req = req.header(CONTENT_TYPE, content_type);
131 };
132 let req = req
133 .extension(Operation::Write)
134 .body(body)
135 .map_err(new_request_build_error)?;
136
137 self.info.http_client().send(req).await
138 }
139
140 pub async fn webhdfs_rename_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
141 let from = build_abs_path(&self.root, from);
142 let to = build_rooted_abs_path(&self.root, to);
143
144 let mut url = format!(
145 "{}/webhdfs/v1/{}?op=RENAME&destination={}",
146 self.endpoint,
147 percent_encode_path(&from),
148 percent_encode_path(&to)
149 );
150 if let Some(user) = &self.user_name {
151 url += format!("&user.name={user}").as_str();
152 }
153 if let Some(auth) = &self.auth {
154 url += &format!("&{auth}");
155 }
156
157 let req = Request::put(&url)
158 .body(Buffer::new())
159 .map_err(new_request_build_error)?;
160
161 self.info.http_client().send(req).await
162 }
163
164 async fn webhdfs_init_append(&self, path: &str) -> Result<String> {
165 let p = build_abs_path(&self.root, path);
166 let mut url = format!(
167 "{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
168 self.endpoint,
169 percent_encode_path(&p),
170 );
171 if let Some(user) = &self.user_name {
172 url += format!("&user.name={user}").as_str();
173 }
174 if let Some(auth) = &self.auth {
175 url += &format!("&{auth}");
176 }
177
178 let req = Request::post(url)
179 .extension(Operation::Write)
180 .body(Buffer::new())
181 .map_err(new_request_build_error)?;
182
183 let resp = self.info.http_client().send(req).await?;
184
185 let status = resp.status();
186
187 match status {
188 StatusCode::OK => {
189 let bs = resp.into_body();
190 let resp: LocationResponse =
191 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
192
193 Ok(resp.location)
194 }
195 _ => Err(parse_error(resp)),
196 }
197 }
198
199 pub async fn webhdfs_append(
200 &self,
201 path: &str,
202 size: u64,
203 body: Buffer,
204 ) -> Result<Response<Buffer>> {
205 let mut url = self.webhdfs_init_append(path).await?;
206 if let Some(user) = &self.user_name {
207 url += format!("&user.name={user}").as_str();
208 }
209 if let Some(auth) = &self.auth {
210 url += &format!("&{auth}");
211 }
212
213 let mut req = Request::post(&url);
214
215 req = req.header(CONTENT_LENGTH, size.to_string());
216
217 let req = req
218 .extension(Operation::Write)
219 .body(body)
220 .map_err(new_request_build_error)?;
221
222 self.info.http_client().send(req).await
223 }
224
225 pub async fn webhdfs_concat(
227 &self,
228 path: &str,
229 sources: Vec<String>,
230 ) -> Result<Response<Buffer>> {
231 let p = build_abs_path(&self.root, path);
232
233 let sources = sources
234 .iter()
235 .map(|p| build_rooted_abs_path(&self.root, p))
236 .collect::<Vec<String>>()
237 .join(",");
238
239 let mut url = format!(
240 "{}/webhdfs/v1/{}?op=CONCAT&sources={}",
241 self.endpoint,
242 percent_encode_path(&p),
243 percent_encode_path(&sources),
244 );
245 if let Some(user) = &self.user_name {
246 url += format!("&user.name={user}").as_str();
247 }
248 if let Some(auth) = &self.auth {
249 url += &format!("&{auth}");
250 }
251
252 let req = Request::post(url);
253
254 let req = req
255 .extension(Operation::Write)
256 .body(Buffer::new())
257 .map_err(new_request_build_error)?;
258
259 self.info.http_client().send(req).await
260 }
261
262 pub async fn webhdfs_list_status(&self, path: &str) -> Result<Response<Buffer>> {
263 let p = build_abs_path(&self.root, path);
264 let mut url = format!(
265 "{}/webhdfs/v1/{}?op=LISTSTATUS",
266 self.endpoint,
267 percent_encode_path(&p),
268 );
269 if let Some(user) = &self.user_name {
270 url += format!("&user.name={user}").as_str();
271 }
272 if let Some(auth) = &self.auth {
273 url += format!("&{auth}").as_str();
274 }
275
276 let req = Request::get(&url)
277 .body(Buffer::new())
278 .map_err(new_request_build_error)?;
279 self.info.http_client().send(req).await
280 }
281
282 pub async fn webhdfs_list_status_batch(
283 &self,
284 path: &str,
285 start_after: &str,
286 ) -> Result<Response<Buffer>> {
287 let p = build_abs_path(&self.root, path);
288
289 let mut url = format!(
290 "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
291 self.endpoint,
292 percent_encode_path(&p),
293 );
294 if !start_after.is_empty() {
295 url += format!("&startAfter={start_after}").as_str();
296 }
297 if let Some(user) = &self.user_name {
298 url += format!("&user.name={user}").as_str();
299 }
300 if let Some(auth) = &self.auth {
301 url += format!("&{auth}").as_str();
302 }
303
304 let req = Request::get(&url)
305 .body(Buffer::new())
306 .map_err(new_request_build_error)?;
307 self.info.http_client().send(req).await
308 }
309
310 fn webhdfs_open_request(&self, path: &str, range: &BytesRange) -> Result<Request<Buffer>> {
311 let p = build_abs_path(&self.root, path);
312 let mut url = format!(
313 "{}/webhdfs/v1/{}?op=OPEN",
314 self.endpoint,
315 percent_encode_path(&p),
316 );
317 if let Some(user) = &self.user_name {
318 url += format!("&user.name={user}").as_str();
319 }
320 if let Some(auth) = &self.auth {
321 url += &format!("&{auth}");
322 }
323
324 if !range.is_full() {
325 url += &format!("&offset={}", range.offset());
326 if let Some(size) = range.size() {
327 url += &format!("&length={size}")
328 }
329 }
330
331 let req = Request::get(&url)
332 .extension(Operation::Read)
333 .body(Buffer::new())
334 .map_err(new_request_build_error)?;
335
336 Ok(req)
337 }
338
339 pub async fn webhdfs_read_file(
340 &self,
341 path: &str,
342 range: BytesRange,
343 ) -> Result<Response<HttpBody>> {
344 let req = self.webhdfs_open_request(path, &range)?;
345 self.info.http_client().fetch(req).await
346 }
347
348 pub(super) async fn webhdfs_get_file_status(&self, path: &str) -> Result<Response<Buffer>> {
349 let p = build_abs_path(&self.root, path);
350 let mut url = format!(
351 "{}/webhdfs/v1/{}?op=GETFILESTATUS",
352 self.endpoint,
353 percent_encode_path(&p),
354 );
355 if let Some(user) = &self.user_name {
356 url += format!("&user.name={user}").as_str();
357 }
358 if let Some(auth) = &self.auth {
359 url += format!("&{auth}").as_str();
360 }
361
362 let req = Request::get(&url)
363 .extension(Operation::Stat)
364 .body(Buffer::new())
365 .map_err(new_request_build_error)?;
366
367 self.info.http_client().send(req).await
368 }
369
370 pub async fn webhdfs_delete(&self, path: &str) -> Result<Response<Buffer>> {
371 let p = build_abs_path(&self.root, path);
372 let mut url = format!(
373 "{}/webhdfs/v1/{}?op=DELETE&recursive=false",
374 self.endpoint,
375 percent_encode_path(&p),
376 );
377 if let Some(user) = &self.user_name {
378 url += format!("&user.name={user}").as_str();
379 }
380 if let Some(auth) = &self.auth {
381 url += format!("&{auth}").as_str();
382 }
383
384 let req = Request::delete(&url)
385 .extension(Operation::Delete)
386 .body(Buffer::new())
387 .map_err(new_request_build_error)?;
388
389 self.info.http_client().send(req).await
390 }
391}
392
393#[derive(Debug, Deserialize)]
394#[serde(rename_all = "PascalCase")]
395pub(super) struct LocationResponse {
396 pub location: String,
397}