opendal/services/webhdfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use core::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use tokio::sync::OnceCell;
27
28use super::core::WebhdfsCore;
29use super::delete::WebhdfsDeleter;
30use super::error::parse_error;
31use super::lister::WebhdfsLister;
32use super::message::BooleanResp;
33use super::message::FileStatusType;
34use super::message::FileStatusWrapper;
35use super::writer::WebhdfsWriter;
36use super::writer::WebhdfsWriters;
37use super::DEFAULT_SCHEME;
38use crate::raw::*;
39use crate::services::WebhdfsConfig;
40use crate::*;
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43impl Configurator for WebhdfsConfig {
44    type Builder = WebhdfsBuilder;
45    fn into_builder(self) -> Self::Builder {
46        WebhdfsBuilder { config: self }
47    }
48}
49
50/// [WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)'s REST API support.
51#[doc = include_str!("docs.md")]
52#[derive(Default, Clone)]
53pub struct WebhdfsBuilder {
54    config: WebhdfsConfig,
55}
56
57impl Debug for WebhdfsBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        let mut d = f.debug_struct("WebhdfsBuilder");
60        d.field("config", &self.config);
61        d.finish_non_exhaustive()
62    }
63}
64
65impl WebhdfsBuilder {
66    /// Set the working directory of this backend
67    ///
68    /// All operations will happen under this root
69    ///
70    /// # Note
71    ///
72    /// The root will be automatically created if not exists.
73    pub fn root(mut self, root: &str) -> Self {
74        self.config.root = if root.is_empty() {
75            None
76        } else {
77            Some(root.to_string())
78        };
79
80        self
81    }
82
83    /// Set the remote address of this backend
84    /// default to `http://127.0.0.1:9870`
85    ///
86    /// Endpoints should be full uri, e.g.
87    ///
88    /// - `https://webhdfs.example.com:9870`
89    /// - `http://192.168.66.88:9870`
90    ///
91    /// If user inputs endpoint without scheme, we will
92    /// prepend `http://` to it.
93    pub fn endpoint(mut self, endpoint: &str) -> Self {
94        if !endpoint.is_empty() {
95            // trim tailing slash so we can accept `http://127.0.0.1:9870/`
96            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
97        }
98        self
99    }
100
101    /// Set the username of this backend,
102    /// used for authentication
103    pub fn user_name(mut self, user_name: &str) -> Self {
104        if !user_name.is_empty() {
105            self.config.user_name = Some(user_name.to_string());
106        }
107        self
108    }
109
110    /// Set the delegation token of this backend,
111    /// used for authentication
112    ///
113    /// # Note
114    /// The builder prefers using delegation token over username.
115    /// If both are set, delegation token will be used.
116    pub fn delegation(mut self, delegation: &str) -> Self {
117        if !delegation.is_empty() {
118            self.config.delegation = Some(delegation.to_string());
119        }
120        self
121    }
122
123    /// Disable batch listing
124    ///
125    /// # Note
126    ///
127    /// When listing a directory, the backend will default to use batch listing.
128    /// If disabled, the backend will list all files/directories in one request.
129    pub fn disable_list_batch(mut self) -> Self {
130        self.config.disable_list_batch = true;
131        self
132    }
133
134    /// Set temp dir for atomic write.
135    ///
136    /// # Notes
137    ///
138    /// If not set, write multi not support, eg: `.opendal_tmp/`.
139    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
140        self.config.atomic_write_dir = if dir.is_empty() {
141            None
142        } else {
143            Some(String::from(dir))
144        };
145        self
146    }
147}
148
149impl Builder for WebhdfsBuilder {
150    type Config = WebhdfsConfig;
151
152    /// build the backend
153    ///
154    /// # Note
155    ///
156    /// when building backend, the built backend will check if the root directory
157    /// exits.
158    /// if the directory does not exit, the directory will be automatically created
159    fn build(self) -> Result<impl Access> {
160        debug!("start building backend: {self:?}");
161
162        let root = normalize_root(&self.config.root.unwrap_or_default());
163        debug!("backend use root {root}");
164
165        // check scheme
166        let endpoint = match self.config.endpoint {
167            Some(endpoint) => {
168                if endpoint.starts_with("http") {
169                    endpoint
170                } else {
171                    format!("http://{endpoint}")
172                }
173            }
174            None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
175        };
176        debug!("backend use endpoint {endpoint}");
177
178        let atomic_write_dir = self.config.atomic_write_dir;
179
180        let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
181
182        let info = AccessorInfo::default();
183        info.set_scheme(DEFAULT_SCHEME)
184            .set_root(&root)
185            .set_native_capability(Capability {
186                stat: true,
187
188                read: true,
189
190                write: true,
191                write_can_append: true,
192                write_can_multi: atomic_write_dir.is_some(),
193
194                create_dir: true,
195                delete: true,
196
197                list: true,
198
199                shared: true,
200
201                ..Default::default()
202            });
203
204        let accessor_info = Arc::new(info);
205        let core = Arc::new(WebhdfsCore {
206            info: accessor_info,
207            root,
208            endpoint,
209            user_name: self.config.user_name,
210            auth,
211            root_checker: OnceCell::new(),
212            atomic_write_dir,
213            disable_list_batch: self.config.disable_list_batch,
214        });
215
216        Ok(WebhdfsBackend { core })
217    }
218}
219
220/// Backend for WebHDFS service
221#[derive(Debug, Clone)]
222pub struct WebhdfsBackend {
223    core: Arc<WebhdfsCore>,
224}
225
226impl WebhdfsBackend {
227    async fn check_root(&self) -> Result<()> {
228        let resp = self.core.webhdfs_get_file_status("/").await?;
229        match resp.status() {
230            StatusCode::OK => {
231                let bs = resp.into_body();
232
233                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
234                    .map_err(new_json_deserialize_error)?
235                    .file_status;
236
237                if file_status.ty == FileStatusType::File {
238                    return Err(Error::new(
239                        ErrorKind::ConfigInvalid,
240                        "root path must be dir",
241                    ));
242                }
243            }
244            StatusCode::NOT_FOUND => {
245                self.create_dir("/", OpCreateDir::new()).await?;
246            }
247            _ => return Err(parse_error(resp)),
248        }
249        Ok(())
250    }
251}
252
253impl Access for WebhdfsBackend {
254    type Reader = HttpBody;
255    type Writer = WebhdfsWriters;
256    type Lister = oio::PageLister<WebhdfsLister>;
257    type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
258
259    fn info(&self) -> Arc<AccessorInfo> {
260        self.core.info.clone()
261    }
262
263    /// Create a file or directory
264    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
265        let resp = self.core.webhdfs_create_dir(path).await?;
266
267        let status = resp.status();
268        // WebHDFS's has a two-step create/append to prevent clients to send out
269        // data before creating it.
270        // According to the redirect policy of `reqwest` HTTP Client we are using,
271        // the redirection should be done automatically.
272        match status {
273            StatusCode::CREATED | StatusCode::OK => {
274                let bs = resp.into_body();
275
276                let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
277                    .map_err(new_json_deserialize_error)?;
278
279                if resp.boolean {
280                    Ok(RpCreateDir::default())
281                } else {
282                    Err(Error::new(
283                        ErrorKind::Unexpected,
284                        "webhdfs create dir failed",
285                    ))
286                }
287            }
288            _ => Err(parse_error(resp)),
289        }
290    }
291
292    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
293        // if root exists and is a directory, stat will be ok
294        self.core
295            .root_checker
296            .get_or_try_init(|| async { self.check_root().await })
297            .await?;
298
299        let resp = self.core.webhdfs_get_file_status(path).await?;
300        let status = resp.status();
301        match status {
302            StatusCode::OK => {
303                let bs = resp.into_body();
304
305                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
306                    .map_err(new_json_deserialize_error)?
307                    .file_status;
308
309                let meta = match file_status.ty {
310                    FileStatusType::Directory => Metadata::new(EntryMode::DIR),
311                    FileStatusType::File => Metadata::new(EntryMode::FILE)
312                        .with_content_length(file_status.length)
313                        .with_last_modified(parse_datetime_from_from_timestamp_millis(
314                            file_status.modification_time,
315                        )?),
316                };
317
318                Ok(RpStat::new(meta))
319            }
320
321            _ => Err(parse_error(resp)),
322        }
323    }
324
325    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
326        let resp = self.core.webhdfs_read_file(path, args.range()).await?;
327
328        let status = resp.status();
329        match status {
330            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
331                Ok((RpRead::default(), resp.into_body()))
332            }
333            _ => {
334                let (part, mut body) = resp.into_parts();
335                let buf = body.to_buffer().await?;
336                Err(parse_error(Response::from_parts(part, buf)))
337            }
338        }
339    }
340
341    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
342        let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
343
344        let w = if args.append() {
345            WebhdfsWriters::Two(oio::AppendWriter::new(w))
346        } else {
347            WebhdfsWriters::One(oio::BlockWriter::new(
348                self.info().clone(),
349                w,
350                args.concurrent(),
351            ))
352        };
353
354        Ok((RpWrite::default(), w))
355    }
356
357    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
358        Ok((
359            RpDelete::default(),
360            oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
361        ))
362    }
363
364    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
365        if args.recursive() {
366            return Err(Error::new(
367                ErrorKind::Unsupported,
368                "WebHDFS doesn't support list with recursive",
369            ));
370        }
371
372        let path = path.trim_end_matches('/');
373        let l = WebhdfsLister::new(self.core.clone(), path);
374        Ok((RpList::default(), oio::PageLister::new(l)))
375    }
376}