opendal/services/webhdfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use http::header::CONTENT_LENGTH;
26use http::header::CONTENT_TYPE;
27use serde::Deserialize;
28use tokio::sync::OnceCell;
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34pub struct WebhdfsCore {
35    pub info: Arc<AccessorInfo>,
36    pub root: String,
37    pub endpoint: String,
38    pub user_name: Option<String>,
39    pub auth: Option<String>,
40    pub root_checker: OnceCell<()>,
41
42    pub atomic_write_dir: Option<String>,
43    pub disable_list_batch: bool,
44}
45
46impl Debug for WebhdfsCore {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("WebhdfsCore")
49            .field("root", &self.root)
50            .field("endpoint", &self.endpoint)
51            .finish_non_exhaustive()
52    }
53}
54
55impl WebhdfsCore {
56    pub async fn webhdfs_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
57        let p = build_abs_path(&self.root, path);
58
59        let mut url = format!(
60            "{}/webhdfs/v1/{}?op=MKDIRS&overwrite=true&noredirect=true",
61            self.endpoint,
62            percent_encode_path(&p),
63        );
64        if let Some(user) = &self.user_name {
65            url += format!("&user.name={user}").as_str();
66        }
67        if let Some(auth) = &self.auth {
68            url += format!("&{auth}").as_str();
69        }
70
71        let req = Request::put(&url);
72
73        let req = req
74            .extension(Operation::CreateDir)
75            .body(Buffer::new())
76            .map_err(new_request_build_error)?;
77
78        self.info.http_client().send(req).await
79    }
80
81    /// create object
82    pub async fn webhdfs_create_object(
83        &self,
84        path: &str,
85        size: Option<u64>,
86        args: &OpWrite,
87        body: Buffer,
88    ) -> Result<Response<Buffer>> {
89        let p = build_abs_path(&self.root, path);
90
91        let mut url = format!(
92            "{}/webhdfs/v1/{}?op=CREATE&overwrite=true&noredirect=true",
93            self.endpoint,
94            percent_encode_path(&p),
95        );
96        if let Some(user) = &self.user_name {
97            url += format!("&user.name={user}").as_str();
98        }
99        if let Some(auth) = &self.auth {
100            url += format!("&{auth}").as_str();
101        }
102
103        let req = Request::put(&url);
104
105        let req = req
106            .extension(Operation::Write)
107            .body(Buffer::new())
108            .map_err(new_request_build_error)?;
109
110        let resp = self.info.http_client().send(req).await?;
111
112        let status = resp.status();
113
114        if status != StatusCode::CREATED && status != StatusCode::OK {
115            return Err(parse_error(resp));
116        }
117
118        let bs = resp.into_body();
119
120        let resp: LocationResponse =
121            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
122
123        let mut req = Request::put(&resp.location);
124
125        if let Some(size) = size {
126            req = req.header(CONTENT_LENGTH, size);
127        };
128
129        if let Some(content_type) = args.content_type() {
130            req = req.header(CONTENT_TYPE, content_type);
131        };
132        let req = req
133            .extension(Operation::Write)
134            .body(body)
135            .map_err(new_request_build_error)?;
136
137        self.info.http_client().send(req).await
138    }
139
140    pub async fn webhdfs_rename_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
141        let from = build_abs_path(&self.root, from);
142        let to = build_rooted_abs_path(&self.root, to);
143
144        let mut url = format!(
145            "{}/webhdfs/v1/{}?op=RENAME&destination={}",
146            self.endpoint,
147            percent_encode_path(&from),
148            percent_encode_path(&to)
149        );
150        if let Some(user) = &self.user_name {
151            url += format!("&user.name={user}").as_str();
152        }
153        if let Some(auth) = &self.auth {
154            url += &format!("&{auth}");
155        }
156
157        let req = Request::put(&url)
158            .body(Buffer::new())
159            .map_err(new_request_build_error)?;
160
161        self.info.http_client().send(req).await
162    }
163
164    async fn webhdfs_init_append(&self, path: &str) -> Result<String> {
165        let p = build_abs_path(&self.root, path);
166        let mut url = format!(
167            "{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
168            self.endpoint,
169            percent_encode_path(&p),
170        );
171        if let Some(user) = &self.user_name {
172            url += format!("&user.name={user}").as_str();
173        }
174        if let Some(auth) = &self.auth {
175            url += &format!("&{auth}");
176        }
177
178        let req = Request::post(url)
179            .extension(Operation::Write)
180            .body(Buffer::new())
181            .map_err(new_request_build_error)?;
182
183        let resp = self.info.http_client().send(req).await?;
184
185        let status = resp.status();
186
187        match status {
188            StatusCode::OK => {
189                let bs = resp.into_body();
190                let resp: LocationResponse =
191                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
192
193                Ok(resp.location)
194            }
195            _ => Err(parse_error(resp)),
196        }
197    }
198
199    pub async fn webhdfs_append(
200        &self,
201        path: &str,
202        size: u64,
203        body: Buffer,
204    ) -> Result<Response<Buffer>> {
205        let mut url = self.webhdfs_init_append(path).await?;
206        if let Some(user) = &self.user_name {
207            url += format!("&user.name={user}").as_str();
208        }
209        if let Some(auth) = &self.auth {
210            url += &format!("&{auth}");
211        }
212
213        let mut req = Request::post(&url);
214
215        req = req.header(CONTENT_LENGTH, size.to_string());
216
217        let req = req
218            .extension(Operation::Write)
219            .body(body)
220            .map_err(new_request_build_error)?;
221
222        self.info.http_client().send(req).await
223    }
224
225    /// CONCAT will concat sources to the path
226    pub async fn webhdfs_concat(
227        &self,
228        path: &str,
229        sources: Vec<String>,
230    ) -> Result<Response<Buffer>> {
231        let p = build_abs_path(&self.root, path);
232
233        let sources = sources
234            .iter()
235            .map(|p| build_rooted_abs_path(&self.root, p))
236            .collect::<Vec<String>>()
237            .join(",");
238
239        let mut url = format!(
240            "{}/webhdfs/v1/{}?op=CONCAT&sources={}",
241            self.endpoint,
242            percent_encode_path(&p),
243            percent_encode_path(&sources),
244        );
245        if let Some(user) = &self.user_name {
246            url += format!("&user.name={user}").as_str();
247        }
248        if let Some(auth) = &self.auth {
249            url += &format!("&{auth}");
250        }
251
252        let req = Request::post(url);
253
254        let req = req
255            .extension(Operation::Write)
256            .body(Buffer::new())
257            .map_err(new_request_build_error)?;
258
259        self.info.http_client().send(req).await
260    }
261
262    pub async fn webhdfs_list_status(&self, path: &str) -> Result<Response<Buffer>> {
263        let p = build_abs_path(&self.root, path);
264        let mut url = format!(
265            "{}/webhdfs/v1/{}?op=LISTSTATUS",
266            self.endpoint,
267            percent_encode_path(&p),
268        );
269        if let Some(user) = &self.user_name {
270            url += format!("&user.name={user}").as_str();
271        }
272        if let Some(auth) = &self.auth {
273            url += format!("&{auth}").as_str();
274        }
275
276        let req = Request::get(&url)
277            .body(Buffer::new())
278            .map_err(new_request_build_error)?;
279        self.info.http_client().send(req).await
280    }
281
282    pub async fn webhdfs_list_status_batch(
283        &self,
284        path: &str,
285        start_after: &str,
286    ) -> Result<Response<Buffer>> {
287        let p = build_abs_path(&self.root, path);
288
289        let mut url = format!(
290            "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
291            self.endpoint,
292            percent_encode_path(&p),
293        );
294        if !start_after.is_empty() {
295            url += format!("&startAfter={start_after}").as_str();
296        }
297        if let Some(user) = &self.user_name {
298            url += format!("&user.name={user}").as_str();
299        }
300        if let Some(auth) = &self.auth {
301            url += format!("&{auth}").as_str();
302        }
303
304        let req = Request::get(&url)
305            .body(Buffer::new())
306            .map_err(new_request_build_error)?;
307        self.info.http_client().send(req).await
308    }
309
310    fn webhdfs_open_request(&self, path: &str, range: &BytesRange) -> Result<Request<Buffer>> {
311        let p = build_abs_path(&self.root, path);
312        let mut url = format!(
313            "{}/webhdfs/v1/{}?op=OPEN",
314            self.endpoint,
315            percent_encode_path(&p),
316        );
317        if let Some(user) = &self.user_name {
318            url += format!("&user.name={user}").as_str();
319        }
320        if let Some(auth) = &self.auth {
321            url += &format!("&{auth}");
322        }
323
324        if !range.is_full() {
325            url += &format!("&offset={}", range.offset());
326            if let Some(size) = range.size() {
327                url += &format!("&length={size}")
328            }
329        }
330
331        let req = Request::get(&url)
332            .extension(Operation::Read)
333            .body(Buffer::new())
334            .map_err(new_request_build_error)?;
335
336        Ok(req)
337    }
338
339    pub async fn webhdfs_read_file(
340        &self,
341        path: &str,
342        range: BytesRange,
343    ) -> Result<Response<HttpBody>> {
344        let req = self.webhdfs_open_request(path, &range)?;
345        self.info.http_client().fetch(req).await
346    }
347
348    pub(super) async fn webhdfs_get_file_status(&self, path: &str) -> Result<Response<Buffer>> {
349        let p = build_abs_path(&self.root, path);
350        let mut url = format!(
351            "{}/webhdfs/v1/{}?op=GETFILESTATUS",
352            self.endpoint,
353            percent_encode_path(&p),
354        );
355        if let Some(user) = &self.user_name {
356            url += format!("&user.name={user}").as_str();
357        }
358        if let Some(auth) = &self.auth {
359            url += format!("&{auth}").as_str();
360        }
361
362        let req = Request::get(&url)
363            .extension(Operation::Stat)
364            .body(Buffer::new())
365            .map_err(new_request_build_error)?;
366
367        self.info.http_client().send(req).await
368    }
369
370    pub async fn webhdfs_delete(&self, path: &str) -> Result<Response<Buffer>> {
371        let p = build_abs_path(&self.root, path);
372        let mut url = format!(
373            "{}/webhdfs/v1/{}?op=DELETE&recursive=false",
374            self.endpoint,
375            percent_encode_path(&p),
376        );
377        if let Some(user) = &self.user_name {
378            url += format!("&user.name={user}").as_str();
379        }
380        if let Some(auth) = &self.auth {
381            url += format!("&{auth}").as_str();
382        }
383
384        let req = Request::delete(&url)
385            .extension(Operation::Delete)
386            .body(Buffer::new())
387            .map_err(new_request_build_error)?;
388
389        self.info.http_client().send(req).await
390    }
391}
392
393#[derive(Debug, Deserialize)]
394#[serde(rename_all = "PascalCase")]
395pub(super) struct LocationResponse {
396    pub location: String,
397}