opendal/services/webhdfs/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::header::CONTENT_LENGTH;
24use http::header::CONTENT_TYPE;
25use http::Request;
26use http::Response;
27use http::StatusCode;
28use serde::Deserialize;
29use tokio::sync::OnceCell;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35pub struct WebhdfsCore {
36 pub info: Arc<AccessorInfo>,
37 pub root: String,
38 pub endpoint: String,
39 pub user_name: Option<String>,
40 pub auth: Option<String>,
41 pub root_checker: OnceCell<()>,
42
43 pub atomic_write_dir: Option<String>,
44 pub disable_list_batch: bool,
45}
46
47impl Debug for WebhdfsCore {
48 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("WebhdfsCore")
50 .field("root", &self.root)
51 .field("endpoint", &self.endpoint)
52 .finish()
53 }
54}
55
56impl WebhdfsCore {
57 pub async fn webhdfs_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
58 let p = build_abs_path(&self.root, path);
59
60 let mut url = format!(
61 "{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true",
62 self.endpoint,
63 percent_encode_path(&p),
64 );
65 if let Some(user) = &self.user_name {
66 url += format!("&user.name={user}").as_str();
67 }
68 if let Some(auth) = &self.auth {
69 url += format!("&{auth}").as_str();
70 }
71
72 let req = Request::put(&url);
73
74 let req = req
75 .extension(Operation::CreateDir)
76 .body(Buffer::new())
77 .map_err(new_request_build_error)?;
78
79 self.info.http_client().send(req).await
80 }
81
82 pub async fn webhdfs_create_object(
84 &self,
85 path: &str,
86 size: Option<u64>,
87 args: &OpWrite,
88 body: Buffer,
89 ) -> Result<Response<Buffer>> {
90 let p = build_abs_path(&self.root, path);
91
92 let mut url = format!(
93 "{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true",
94 self.endpoint,
95 percent_encode_path(&p),
96 );
97 if let Some(user) = &self.user_name {
98 url += format!("&user.name={user}").as_str();
99 }
100 if let Some(auth) = &self.auth {
101 url += format!("&{auth}").as_str();
102 }
103
104 let req = Request::put(&url);
105
106 let req = req
107 .extension(Operation::Write)
108 .body(Buffer::new())
109 .map_err(new_request_build_error)?;
110
111 let resp = self.info.http_client().send(req).await?;
112
113 let status = resp.status();
114
115 if status != StatusCode::CREATED && status != StatusCode::OK {
116 return Err(parse_error(resp));
117 }
118
119 let bs = resp.into_body();
120
121 let resp: LocationResponse =
122 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
123
124 let mut req = Request::put(&resp.location);
125
126 if let Some(size) = size {
127 req = req.header(CONTENT_LENGTH, size);
128 };
129
130 if let Some(content_type) = args.content_type() {
131 req = req.header(CONTENT_TYPE, content_type);
132 };
133 let req = req
134 .extension(Operation::Write)
135 .body(body)
136 .map_err(new_request_build_error)?;
137
138 self.info.http_client().send(req).await
139 }
140
141 pub async fn webhdfs_rename_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
142 let from = build_abs_path(&self.root, from);
143 let to = build_rooted_abs_path(&self.root, to);
144
145 let mut url = format!(
146 "{}/webhdfs/v1/{}?op=RENAME&destination={}",
147 self.endpoint,
148 percent_encode_path(&from),
149 percent_encode_path(&to)
150 );
151 if let Some(user) = &self.user_name {
152 url += format!("&user.name={user}").as_str();
153 }
154 if let Some(auth) = &self.auth {
155 url += &format!("&{auth}");
156 }
157
158 let req = Request::put(&url)
159 .body(Buffer::new())
160 .map_err(new_request_build_error)?;
161
162 self.info.http_client().send(req).await
163 }
164
165 async fn webhdfs_init_append(&self, path: &str) -> Result<String> {
166 let p = build_abs_path(&self.root, path);
167 let mut url = format!(
168 "{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
169 self.endpoint,
170 percent_encode_path(&p),
171 );
172 if let Some(user) = &self.user_name {
173 url += format!("&user.name={user}").as_str();
174 }
175 if let Some(auth) = &self.auth {
176 url += &format!("&{auth}");
177 }
178
179 let req = Request::post(url)
180 .extension(Operation::Write)
181 .body(Buffer::new())
182 .map_err(new_request_build_error)?;
183
184 let resp = self.info.http_client().send(req).await?;
185
186 let status = resp.status();
187
188 match status {
189 StatusCode::OK => {
190 let bs = resp.into_body();
191 let resp: LocationResponse =
192 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
193
194 Ok(resp.location)
195 }
196 _ => Err(parse_error(resp)),
197 }
198 }
199
200 pub async fn webhdfs_append(
201 &self,
202 path: &str,
203 size: u64,
204 body: Buffer,
205 ) -> Result<Response<Buffer>> {
206 let mut url = self.webhdfs_init_append(path).await?;
207 if let Some(user) = &self.user_name {
208 url += format!("&user.name={user}").as_str();
209 }
210 if let Some(auth) = &self.auth {
211 url += &format!("&{auth}");
212 }
213
214 let mut req = Request::post(&url);
215
216 req = req.header(CONTENT_LENGTH, size.to_string());
217
218 let req = req
219 .extension(Operation::Write)
220 .body(body)
221 .map_err(new_request_build_error)?;
222
223 self.info.http_client().send(req).await
224 }
225
226 pub async fn webhdfs_concat(
228 &self,
229 path: &str,
230 sources: Vec<String>,
231 ) -> Result<Response<Buffer>> {
232 let p = build_abs_path(&self.root, path);
233
234 let sources = sources
235 .iter()
236 .map(|p| build_rooted_abs_path(&self.root, p))
237 .collect::<Vec<String>>()
238 .join(",");
239
240 let mut url = format!(
241 "{}/webhdfs/v1/{}?op=CONCAT&sources={}",
242 self.endpoint,
243 percent_encode_path(&p),
244 percent_encode_path(&sources),
245 );
246 if let Some(user) = &self.user_name {
247 url += format!("&user.name={user}").as_str();
248 }
249 if let Some(auth) = &self.auth {
250 url += &format!("&{auth}");
251 }
252
253 let req = Request::post(url);
254
255 let req = req
256 .extension(Operation::Write)
257 .body(Buffer::new())
258 .map_err(new_request_build_error)?;
259
260 self.info.http_client().send(req).await
261 }
262
263 pub async fn webhdfs_list_status(&self, path: &str) -> Result<Response<Buffer>> {
264 let p = build_abs_path(&self.root, path);
265 let mut url = format!(
266 "{}/webhdfs/v1/{}?op=LISTSTATUS",
267 self.endpoint,
268 percent_encode_path(&p),
269 );
270 if let Some(user) = &self.user_name {
271 url += format!("&user.name={user}").as_str();
272 }
273 if let Some(auth) = &self.auth {
274 url += format!("&{auth}").as_str();
275 }
276
277 let req = Request::get(&url)
278 .body(Buffer::new())
279 .map_err(new_request_build_error)?;
280 self.info.http_client().send(req).await
281 }
282
283 pub async fn webhdfs_list_status_batch(
284 &self,
285 path: &str,
286 start_after: &str,
287 ) -> Result<Response<Buffer>> {
288 let p = build_abs_path(&self.root, path);
289
290 let mut url = format!(
291 "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
292 self.endpoint,
293 percent_encode_path(&p),
294 );
295 if !start_after.is_empty() {
296 url += format!("&startAfter={}", start_after).as_str();
297 }
298 if let Some(user) = &self.user_name {
299 url += format!("&user.name={user}").as_str();
300 }
301 if let Some(auth) = &self.auth {
302 url += format!("&{auth}").as_str();
303 }
304
305 let req = Request::get(&url)
306 .body(Buffer::new())
307 .map_err(new_request_build_error)?;
308 self.info.http_client().send(req).await
309 }
310
311 fn webhdfs_open_request(&self, path: &str, range: &BytesRange) -> Result<Request<Buffer>> {
312 let p = build_abs_path(&self.root, path);
313 let mut url = format!(
314 "{}/webhdfs/v1/{}?op=OPEN",
315 self.endpoint,
316 percent_encode_path(&p),
317 );
318 if let Some(user) = &self.user_name {
319 url += format!("&user.name={user}").as_str();
320 }
321 if let Some(auth) = &self.auth {
322 url += &format!("&{auth}");
323 }
324
325 if !range.is_full() {
326 url += &format!("&offset={}", range.offset());
327 if let Some(size) = range.size() {
328 url += &format!("&length={size}")
329 }
330 }
331
332 let req = Request::get(&url)
333 .extension(Operation::Read)
334 .body(Buffer::new())
335 .map_err(new_request_build_error)?;
336
337 Ok(req)
338 }
339
340 pub async fn webhdfs_read_file(
341 &self,
342 path: &str,
343 range: BytesRange,
344 ) -> Result<Response<HttpBody>> {
345 let req = self.webhdfs_open_request(path, &range)?;
346 self.info.http_client().fetch(req).await
347 }
348
349 pub(super) async fn webhdfs_get_file_status(&self, path: &str) -> Result<Response<Buffer>> {
350 let p = build_abs_path(&self.root, path);
351 let mut url = format!(
352 "{}/webhdfs/v1/{}?op=GETFILESTATUS",
353 self.endpoint,
354 percent_encode_path(&p),
355 );
356 if let Some(user) = &self.user_name {
357 url += format!("&user.name={user}").as_str();
358 }
359 if let Some(auth) = &self.auth {
360 url += format!("&{auth}").as_str();
361 }
362
363 let req = Request::get(&url)
364 .extension(Operation::Stat)
365 .body(Buffer::new())
366 .map_err(new_request_build_error)?;
367
368 self.info.http_client().send(req).await
369 }
370
371 pub async fn webhdfs_delete(&self, path: &str) -> Result<Response<Buffer>> {
372 let p = build_abs_path(&self.root, path);
373 let mut url = format!(
374 "{}/webhdfs/v1/{}?op=DELETE&recursive=false",
375 self.endpoint,
376 percent_encode_path(&p),
377 );
378 if let Some(user) = &self.user_name {
379 url += format!("&user.name={user}").as_str();
380 }
381 if let Some(auth) = &self.auth {
382 url += format!("&{auth}").as_str();
383 }
384
385 let req = Request::delete(&url)
386 .extension(Operation::Delete)
387 .body(Buffer::new())
388 .map_err(new_request_build_error)?;
389
390 self.info.http_client().send(req).await
391 }
392}
393
394#[derive(Debug, Deserialize)]
395#[serde(rename_all = "PascalCase")]
396pub(super) struct LocationResponse {
397 pub location: String,
398}