opendal/services/webhdfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Formatter;
19use std::sync::Arc;
20
21use bytes::Buf;
22use core::fmt::Debug;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use tokio::sync::OnceCell;
27
28use super::core::WebhdfsCore;
29use super::delete::WebhdfsDeleter;
30use super::error::parse_error;
31use super::lister::WebhdfsLister;
32use super::message::BooleanResp;
33use super::message::FileStatusType;
34use super::message::FileStatusWrapper;
35use super::writer::WebhdfsWriter;
36use super::writer::WebhdfsWriters;
37use crate::raw::*;
38use crate::services::WebhdfsConfig;
39use crate::*;
40
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43impl Configurator for WebhdfsConfig {
44    type Builder = WebhdfsBuilder;
45    fn into_builder(self) -> Self::Builder {
46        WebhdfsBuilder { config: self }
47    }
48}
49
50/// [WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)'s REST API support.
51#[doc = include_str!("docs.md")]
52#[derive(Default, Clone)]
53pub struct WebhdfsBuilder {
54    config: WebhdfsConfig,
55}
56
57impl Debug for WebhdfsBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        let mut d = f.debug_struct("WebhdfsBuilder");
60        d.field("config", &self.config);
61        d.finish_non_exhaustive()
62    }
63}
64
65impl WebhdfsBuilder {
66    /// Set the working directory of this backend
67    ///
68    /// All operations will happen under this root
69    ///
70    /// # Note
71    ///
72    /// The root will be automatically created if not exists.
73    pub fn root(mut self, root: &str) -> Self {
74        self.config.root = if root.is_empty() {
75            None
76        } else {
77            Some(root.to_string())
78        };
79
80        self
81    }
82
83    /// Set the remote address of this backend
84    /// default to `http://127.0.0.1:9870`
85    ///
86    /// Endpoints should be full uri, e.g.
87    ///
88    /// - `https://webhdfs.example.com:9870`
89    /// - `http://192.168.66.88:9870`
90    ///
91    /// If user inputs endpoint without scheme, we will
92    /// prepend `http://` to it.
93    pub fn endpoint(mut self, endpoint: &str) -> Self {
94        if !endpoint.is_empty() {
95            // trim tailing slash so we can accept `http://127.0.0.1:9870/`
96            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
97        }
98        self
99    }
100
101    /// Set the username of this backend,
102    /// used for authentication
103    ///
104    pub fn user_name(mut self, user_name: &str) -> Self {
105        if !user_name.is_empty() {
106            self.config.user_name = Some(user_name.to_string());
107        }
108        self
109    }
110
111    /// Set the delegation token of this backend,
112    /// used for authentication
113    ///
114    /// # Note
115    /// The builder prefers using delegation token over username.
116    /// If both are set, delegation token will be used.
117    pub fn delegation(mut self, delegation: &str) -> Self {
118        if !delegation.is_empty() {
119            self.config.delegation = Some(delegation.to_string());
120        }
121        self
122    }
123
124    /// Disable batch listing
125    ///
126    /// # Note
127    ///
128    /// When listing a directory, the backend will default to use batch listing.
129    /// If disabled, the backend will list all files/directories in one request.
130    pub fn disable_list_batch(mut self) -> Self {
131        self.config.disable_list_batch = true;
132        self
133    }
134
135    /// Set temp dir for atomic write.
136    ///
137    /// # Notes
138    ///
139    /// If not set, write multi not support, eg: `.opendal_tmp/`.
140    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
141        self.config.atomic_write_dir = if dir.is_empty() {
142            None
143        } else {
144            Some(String::from(dir))
145        };
146        self
147    }
148}
149
150impl Builder for WebhdfsBuilder {
151    const SCHEME: Scheme = Scheme::Webhdfs;
152    type Config = WebhdfsConfig;
153
154    /// build the backend
155    ///
156    /// # Note
157    ///
158    /// when building backend, the built backend will check if the root directory
159    /// exits.
160    /// if the directory does not exit, the directory will be automatically created
161    fn build(self) -> Result<impl Access> {
162        debug!("start building backend: {:?}", self);
163
164        let root = normalize_root(&self.config.root.unwrap_or_default());
165        debug!("backend use root {root}");
166
167        // check scheme
168        let endpoint = match self.config.endpoint {
169            Some(endpoint) => {
170                if endpoint.starts_with("http") {
171                    endpoint
172                } else {
173                    format!("http://{endpoint}")
174                }
175            }
176            None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
177        };
178        debug!("backend use endpoint {}", endpoint);
179
180        let atomic_write_dir = self.config.atomic_write_dir;
181
182        let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
183
184        let info = AccessorInfo::default();
185        info.set_scheme(Scheme::Webhdfs)
186            .set_root(&root)
187            .set_native_capability(Capability {
188                stat: true,
189                stat_has_content_length: true,
190                stat_has_last_modified: true,
191
192                read: true,
193
194                write: true,
195                write_can_append: true,
196                write_can_multi: atomic_write_dir.is_some(),
197
198                create_dir: true,
199                delete: true,
200
201                list: true,
202                list_has_content_length: true,
203                list_has_last_modified: true,
204
205                shared: true,
206
207                ..Default::default()
208            });
209
210        let accessor_info = Arc::new(info);
211        let core = Arc::new(WebhdfsCore {
212            info: accessor_info,
213            root,
214            endpoint,
215            user_name: self.config.user_name,
216            auth,
217            root_checker: OnceCell::new(),
218            atomic_write_dir,
219            disable_list_batch: self.config.disable_list_batch,
220        });
221
222        Ok(WebhdfsBackend { core })
223    }
224}
225
226/// Backend for WebHDFS service
227#[derive(Debug, Clone)]
228pub struct WebhdfsBackend {
229    core: Arc<WebhdfsCore>,
230}
231
232impl WebhdfsBackend {
233    async fn check_root(&self) -> Result<()> {
234        let resp = self.core.webhdfs_get_file_status("/").await?;
235        match resp.status() {
236            StatusCode::OK => {
237                let bs = resp.into_body();
238
239                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
240                    .map_err(new_json_deserialize_error)?
241                    .file_status;
242
243                if file_status.ty == FileStatusType::File {
244                    return Err(Error::new(
245                        ErrorKind::ConfigInvalid,
246                        "root path must be dir",
247                    ));
248                }
249            }
250            StatusCode::NOT_FOUND => {
251                self.create_dir("/", OpCreateDir::new()).await?;
252            }
253            _ => return Err(parse_error(resp)),
254        }
255        Ok(())
256    }
257}
258
259impl Access for WebhdfsBackend {
260    type Reader = HttpBody;
261    type Writer = WebhdfsWriters;
262    type Lister = oio::PageLister<WebhdfsLister>;
263    type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
264    type BlockingReader = ();
265    type BlockingWriter = ();
266    type BlockingLister = ();
267    type BlockingDeleter = ();
268
269    fn info(&self) -> Arc<AccessorInfo> {
270        self.core.info.clone()
271    }
272
273    /// Create a file or directory
274    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
275        let req = self.core.webhdfs_create_dir_request(path)?;
276
277        let resp = self.info().http_client().send(req).await?;
278
279        let status = resp.status();
280
281        // WebHDFS's has a two-step create/append to prevent clients to send out
282        // data before creating it.
283        // According to the redirect policy of `reqwest` HTTP Client we are using,
284        // the redirection should be done automatically.
285        match status {
286            StatusCode::CREATED | StatusCode::OK => {
287                let bs = resp.into_body();
288
289                let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
290                    .map_err(new_json_deserialize_error)?;
291
292                if resp.boolean {
293                    Ok(RpCreateDir::default())
294                } else {
295                    Err(Error::new(
296                        ErrorKind::Unexpected,
297                        "webhdfs create dir failed",
298                    ))
299                }
300            }
301            _ => Err(parse_error(resp)),
302        }
303    }
304
305    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
306        // if root exists and is a directory, stat will be ok
307        self.core
308            .root_checker
309            .get_or_try_init(|| async { self.check_root().await })
310            .await?;
311
312        let resp = self.core.webhdfs_get_file_status(path).await?;
313        let status = resp.status();
314        match status {
315            StatusCode::OK => {
316                let bs = resp.into_body();
317
318                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
319                    .map_err(new_json_deserialize_error)?
320                    .file_status;
321
322                let meta = match file_status.ty {
323                    FileStatusType::Directory => Metadata::new(EntryMode::DIR),
324                    FileStatusType::File => Metadata::new(EntryMode::FILE)
325                        .with_content_length(file_status.length)
326                        .with_last_modified(parse_datetime_from_from_timestamp_millis(
327                            file_status.modification_time,
328                        )?),
329                };
330
331                Ok(RpStat::new(meta))
332            }
333
334            _ => Err(parse_error(resp)),
335        }
336    }
337
338    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
339        let resp = self.core.webhdfs_read_file(path, args.range()).await?;
340
341        let status = resp.status();
342
343        match status {
344            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
345                Ok((RpRead::default(), resp.into_body()))
346            }
347            _ => {
348                let (part, mut body) = resp.into_parts();
349                let buf = body.to_buffer().await?;
350                Err(parse_error(Response::from_parts(part, buf)))
351            }
352        }
353    }
354
355    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
356        let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
357
358        let w = if args.append() {
359            WebhdfsWriters::Two(oio::AppendWriter::new(w))
360        } else {
361            WebhdfsWriters::One(oio::BlockWriter::new(
362                self.info().clone(),
363                w,
364                args.concurrent(),
365            ))
366        };
367
368        Ok((RpWrite::default(), w))
369    }
370
371    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
372        Ok((
373            RpDelete::default(),
374            oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
375        ))
376    }
377
378    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
379        if args.recursive() {
380            return Err(Error::new(
381                ErrorKind::Unsupported,
382                "WebHDFS doesn't support list with recursive",
383            ));
384        }
385
386        let path = path.trim_end_matches('/');
387        let l = WebhdfsLister::new(self.core.clone(), path);
388        Ok((RpList::default(), oio::PageLister::new(l)))
389    }
390}