opendal/services/webhdfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::header::CONTENT_LENGTH;
24use http::header::CONTENT_TYPE;
25use http::Request;
26use http::Response;
27use http::StatusCode;
28use serde::Deserialize;
29use tokio::sync::OnceCell;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35pub struct WebhdfsCore {
36    pub info: Arc<AccessorInfo>,
37    pub root: String,
38    pub endpoint: String,
39    pub user_name: Option<String>,
40    pub auth: Option<String>,
41    pub root_checker: OnceCell<()>,
42
43    pub atomic_write_dir: Option<String>,
44    pub disable_list_batch: bool,
45}
46
47impl Debug for WebhdfsCore {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("WebhdfsCore")
50            .field("root", &self.root)
51            .field("endpoint", &self.endpoint)
52            .finish()
53    }
54}
55
56impl WebhdfsCore {
57    pub async fn webhdfs_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
58        let p = build_abs_path(&self.root, path);
59
60        let mut url = format!(
61            "{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true",
62            self.endpoint,
63            percent_encode_path(&p),
64        );
65        if let Some(user) = &self.user_name {
66            url += format!("&user.name={user}").as_str();
67        }
68        if let Some(auth) = &self.auth {
69            url += format!("&{auth}").as_str();
70        }
71
72        let req = Request::put(&url);
73
74        let req = req
75            .extension(Operation::CreateDir)
76            .body(Buffer::new())
77            .map_err(new_request_build_error)?;
78
79        self.info.http_client().send(req).await
80    }
81
82    /// create object
83    pub async fn webhdfs_create_object(
84        &self,
85        path: &str,
86        size: Option<u64>,
87        args: &OpWrite,
88        body: Buffer,
89    ) -> Result<Response<Buffer>> {
90        let p = build_abs_path(&self.root, path);
91
92        let mut url = format!(
93            "{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true",
94            self.endpoint,
95            percent_encode_path(&p),
96        );
97        if let Some(user) = &self.user_name {
98            url += format!("&user.name={user}").as_str();
99        }
100        if let Some(auth) = &self.auth {
101            url += format!("&{auth}").as_str();
102        }
103
104        let req = Request::put(&url);
105
106        let req = req
107            .extension(Operation::Write)
108            .body(Buffer::new())
109            .map_err(new_request_build_error)?;
110
111        let resp = self.info.http_client().send(req).await?;
112
113        let status = resp.status();
114
115        if status != StatusCode::CREATED && status != StatusCode::OK {
116            return Err(parse_error(resp));
117        }
118
119        let bs = resp.into_body();
120
121        let resp: LocationResponse =
122            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
123
124        let mut req = Request::put(&resp.location);
125
126        if let Some(size) = size {
127            req = req.header(CONTENT_LENGTH, size);
128        };
129
130        if let Some(content_type) = args.content_type() {
131            req = req.header(CONTENT_TYPE, content_type);
132        };
133        let req = req
134            .extension(Operation::Write)
135            .body(body)
136            .map_err(new_request_build_error)?;
137
138        self.info.http_client().send(req).await
139    }
140
141    pub async fn webhdfs_rename_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
142        let from = build_abs_path(&self.root, from);
143        let to = build_rooted_abs_path(&self.root, to);
144
145        let mut url = format!(
146            "{}/webhdfs/v1/{}?op=RENAME&destination={}",
147            self.endpoint,
148            percent_encode_path(&from),
149            percent_encode_path(&to)
150        );
151        if let Some(user) = &self.user_name {
152            url += format!("&user.name={user}").as_str();
153        }
154        if let Some(auth) = &self.auth {
155            url += &format!("&{auth}");
156        }
157
158        let req = Request::put(&url)
159            .body(Buffer::new())
160            .map_err(new_request_build_error)?;
161
162        self.info.http_client().send(req).await
163    }
164
165    async fn webhdfs_init_append(&self, path: &str) -> Result<String> {
166        let p = build_abs_path(&self.root, path);
167        let mut url = format!(
168            "{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
169            self.endpoint,
170            percent_encode_path(&p),
171        );
172        if let Some(user) = &self.user_name {
173            url += format!("&user.name={user}").as_str();
174        }
175        if let Some(auth) = &self.auth {
176            url += &format!("&{auth}");
177        }
178
179        let req = Request::post(url)
180            .extension(Operation::Write)
181            .body(Buffer::new())
182            .map_err(new_request_build_error)?;
183
184        let resp = self.info.http_client().send(req).await?;
185
186        let status = resp.status();
187
188        match status {
189            StatusCode::OK => {
190                let bs = resp.into_body();
191                let resp: LocationResponse =
192                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
193
194                Ok(resp.location)
195            }
196            _ => Err(parse_error(resp)),
197        }
198    }
199
200    pub async fn webhdfs_append(
201        &self,
202        path: &str,
203        size: u64,
204        body: Buffer,
205    ) -> Result<Response<Buffer>> {
206        let mut url = self.webhdfs_init_append(path).await?;
207        if let Some(user) = &self.user_name {
208            url += format!("&user.name={user}").as_str();
209        }
210        if let Some(auth) = &self.auth {
211            url += &format!("&{auth}");
212        }
213
214        let mut req = Request::post(&url);
215
216        req = req.header(CONTENT_LENGTH, size.to_string());
217
218        let req = req
219            .extension(Operation::Write)
220            .body(body)
221            .map_err(new_request_build_error)?;
222
223        self.info.http_client().send(req).await
224    }
225
226    /// CONCAT will concat sources to the path
227    pub async fn webhdfs_concat(
228        &self,
229        path: &str,
230        sources: Vec<String>,
231    ) -> Result<Response<Buffer>> {
232        let p = build_abs_path(&self.root, path);
233
234        let sources = sources
235            .iter()
236            .map(|p| build_rooted_abs_path(&self.root, p))
237            .collect::<Vec<String>>()
238            .join(",");
239
240        let mut url = format!(
241            "{}/webhdfs/v1/{}?op=CONCAT&sources={}",
242            self.endpoint,
243            percent_encode_path(&p),
244            percent_encode_path(&sources),
245        );
246        if let Some(user) = &self.user_name {
247            url += format!("&user.name={user}").as_str();
248        }
249        if let Some(auth) = &self.auth {
250            url += &format!("&{auth}");
251        }
252
253        let req = Request::post(url);
254
255        let req = req
256            .extension(Operation::Write)
257            .body(Buffer::new())
258            .map_err(new_request_build_error)?;
259
260        self.info.http_client().send(req).await
261    }
262
263    pub async fn webhdfs_list_status(&self, path: &str) -> Result<Response<Buffer>> {
264        let p = build_abs_path(&self.root, path);
265        let mut url = format!(
266            "{}/webhdfs/v1/{}?op=LISTSTATUS",
267            self.endpoint,
268            percent_encode_path(&p),
269        );
270        if let Some(user) = &self.user_name {
271            url += format!("&user.name={user}").as_str();
272        }
273        if let Some(auth) = &self.auth {
274            url += format!("&{auth}").as_str();
275        }
276
277        let req = Request::get(&url)
278            .body(Buffer::new())
279            .map_err(new_request_build_error)?;
280        self.info.http_client().send(req).await
281    }
282
283    pub async fn webhdfs_list_status_batch(
284        &self,
285        path: &str,
286        start_after: &str,
287    ) -> Result<Response<Buffer>> {
288        let p = build_abs_path(&self.root, path);
289
290        let mut url = format!(
291            "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
292            self.endpoint,
293            percent_encode_path(&p),
294        );
295        if !start_after.is_empty() {
296            url += format!("&startAfter={}", start_after).as_str();
297        }
298        if let Some(user) = &self.user_name {
299            url += format!("&user.name={user}").as_str();
300        }
301        if let Some(auth) = &self.auth {
302            url += format!("&{auth}").as_str();
303        }
304
305        let req = Request::get(&url)
306            .body(Buffer::new())
307            .map_err(new_request_build_error)?;
308        self.info.http_client().send(req).await
309    }
310
311    fn webhdfs_open_request(&self, path: &str, range: &BytesRange) -> Result<Request<Buffer>> {
312        let p = build_abs_path(&self.root, path);
313        let mut url = format!(
314            "{}/webhdfs/v1/{}?op=OPEN",
315            self.endpoint,
316            percent_encode_path(&p),
317        );
318        if let Some(user) = &self.user_name {
319            url += format!("&user.name={user}").as_str();
320        }
321        if let Some(auth) = &self.auth {
322            url += &format!("&{auth}");
323        }
324
325        if !range.is_full() {
326            url += &format!("&offset={}", range.offset());
327            if let Some(size) = range.size() {
328                url += &format!("&length={size}")
329            }
330        }
331
332        let req = Request::get(&url)
333            .extension(Operation::Read)
334            .body(Buffer::new())
335            .map_err(new_request_build_error)?;
336
337        Ok(req)
338    }
339
340    pub async fn webhdfs_read_file(
341        &self,
342        path: &str,
343        range: BytesRange,
344    ) -> Result<Response<HttpBody>> {
345        let req = self.webhdfs_open_request(path, &range)?;
346        self.info.http_client().fetch(req).await
347    }
348
349    pub(super) async fn webhdfs_get_file_status(&self, path: &str) -> Result<Response<Buffer>> {
350        let p = build_abs_path(&self.root, path);
351        let mut url = format!(
352            "{}/webhdfs/v1/{}?op=GETFILESTATUS",
353            self.endpoint,
354            percent_encode_path(&p),
355        );
356        if let Some(user) = &self.user_name {
357            url += format!("&user.name={user}").as_str();
358        }
359        if let Some(auth) = &self.auth {
360            url += format!("&{auth}").as_str();
361        }
362
363        let req = Request::get(&url)
364            .extension(Operation::Stat)
365            .body(Buffer::new())
366            .map_err(new_request_build_error)?;
367
368        self.info.http_client().send(req).await
369    }
370
371    pub async fn webhdfs_delete(&self, path: &str) -> Result<Response<Buffer>> {
372        let p = build_abs_path(&self.root, path);
373        let mut url = format!(
374            "{}/webhdfs/v1/{}?op=DELETE&recursive=false",
375            self.endpoint,
376            percent_encode_path(&p),
377        );
378        if let Some(user) = &self.user_name {
379            url += format!("&user.name={user}").as_str();
380        }
381        if let Some(auth) = &self.auth {
382            url += format!("&{auth}").as_str();
383        }
384
385        let req = Request::delete(&url)
386            .extension(Operation::Delete)
387            .body(Buffer::new())
388            .map_err(new_request_build_error)?;
389
390        self.info.http_client().send(req).await
391    }
392}
393
394#[derive(Debug, Deserialize)]
395#[serde(rename_all = "PascalCase")]
396pub(super) struct LocationResponse {
397    pub location: String,
398}