opendal/services/webhdfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use core::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use tokio::sync::OnceCell;
27
28use super::core::WebhdfsCore;
29use super::delete::WebhdfsDeleter;
30use super::error::parse_error;
31use super::lister::WebhdfsLister;
32use super::message::BooleanResp;
33use super::message::FileStatusType;
34use super::message::FileStatusWrapper;
35use super::writer::WebhdfsWriter;
36use super::writer::WebhdfsWriters;
37use crate::raw::*;
38use crate::services::WebhdfsConfig;
39use crate::*;
40
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43impl Configurator for WebhdfsConfig {
44    type Builder = WebhdfsBuilder;
45    fn into_builder(self) -> Self::Builder {
46        WebhdfsBuilder { config: self }
47    }
48}
49
50/// [WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)'s REST API support.
51#[doc = include_str!("docs.md")]
52#[derive(Default, Clone)]
53pub struct WebhdfsBuilder {
54    config: WebhdfsConfig,
55}
56
57impl Debug for WebhdfsBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        let mut d = f.debug_struct("WebhdfsBuilder");
60        d.field("config", &self.config);
61        d.finish_non_exhaustive()
62    }
63}
64
65impl WebhdfsBuilder {
66    /// Set the working directory of this backend
67    ///
68    /// All operations will happen under this root
69    ///
70    /// # Note
71    ///
72    /// The root will be automatically created if not exists.
73    pub fn root(mut self, root: &str) -> Self {
74        self.config.root = if root.is_empty() {
75            None
76        } else {
77            Some(root.to_string())
78        };
79
80        self
81    }
82
83    /// Set the remote address of this backend
84    /// default to `http://127.0.0.1:9870`
85    ///
86    /// Endpoints should be full uri, e.g.
87    ///
88    /// - `https://webhdfs.example.com:9870`
89    /// - `http://192.168.66.88:9870`
90    ///
91    /// If user inputs endpoint without scheme, we will
92    /// prepend `http://` to it.
93    pub fn endpoint(mut self, endpoint: &str) -> Self {
94        if !endpoint.is_empty() {
95            // trim tailing slash so we can accept `http://127.0.0.1:9870/`
96            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
97        }
98        self
99    }
100
101    /// Set the username of this backend,
102    /// used for authentication
103    pub fn user_name(mut self, user_name: &str) -> Self {
104        if !user_name.is_empty() {
105            self.config.user_name = Some(user_name.to_string());
106        }
107        self
108    }
109
110    /// Set the delegation token of this backend,
111    /// used for authentication
112    ///
113    /// # Note
114    /// The builder prefers using delegation token over username.
115    /// If both are set, delegation token will be used.
116    pub fn delegation(mut self, delegation: &str) -> Self {
117        if !delegation.is_empty() {
118            self.config.delegation = Some(delegation.to_string());
119        }
120        self
121    }
122
123    /// Disable batch listing
124    ///
125    /// # Note
126    ///
127    /// When listing a directory, the backend will default to use batch listing.
128    /// If disabled, the backend will list all files/directories in one request.
129    pub fn disable_list_batch(mut self) -> Self {
130        self.config.disable_list_batch = true;
131        self
132    }
133
134    /// Set temp dir for atomic write.
135    ///
136    /// # Notes
137    ///
138    /// If not set, write multi not support, eg: `.opendal_tmp/`.
139    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
140        self.config.atomic_write_dir = if dir.is_empty() {
141            None
142        } else {
143            Some(String::from(dir))
144        };
145        self
146    }
147}
148
149impl Builder for WebhdfsBuilder {
150    const SCHEME: Scheme = Scheme::Webhdfs;
151    type Config = WebhdfsConfig;
152
153    /// build the backend
154    ///
155    /// # Note
156    ///
157    /// when building backend, the built backend will check if the root directory
158    /// exits.
159    /// if the directory does not exit, the directory will be automatically created
160    fn build(self) -> Result<impl Access> {
161        debug!("start building backend: {:?}", self);
162
163        let root = normalize_root(&self.config.root.unwrap_or_default());
164        debug!("backend use root {root}");
165
166        // check scheme
167        let endpoint = match self.config.endpoint {
168            Some(endpoint) => {
169                if endpoint.starts_with("http") {
170                    endpoint
171                } else {
172                    format!("http://{endpoint}")
173                }
174            }
175            None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
176        };
177        debug!("backend use endpoint {}", endpoint);
178
179        let atomic_write_dir = self.config.atomic_write_dir;
180
181        let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
182
183        let info = AccessorInfo::default();
184        info.set_scheme(Scheme::Webhdfs)
185            .set_root(&root)
186            .set_native_capability(Capability {
187                stat: true,
188                stat_has_content_length: true,
189                stat_has_last_modified: true,
190
191                read: true,
192
193                write: true,
194                write_can_append: true,
195                write_can_multi: atomic_write_dir.is_some(),
196
197                create_dir: true,
198                delete: true,
199
200                list: true,
201                list_has_content_length: true,
202                list_has_last_modified: true,
203
204                shared: true,
205
206                ..Default::default()
207            });
208
209        let accessor_info = Arc::new(info);
210        let core = Arc::new(WebhdfsCore {
211            info: accessor_info,
212            root,
213            endpoint,
214            user_name: self.config.user_name,
215            auth,
216            root_checker: OnceCell::new(),
217            atomic_write_dir,
218            disable_list_batch: self.config.disable_list_batch,
219        });
220
221        Ok(WebhdfsBackend { core })
222    }
223}
224
225/// Backend for WebHDFS service
226#[derive(Debug, Clone)]
227pub struct WebhdfsBackend {
228    core: Arc<WebhdfsCore>,
229}
230
231impl WebhdfsBackend {
232    async fn check_root(&self) -> Result<()> {
233        let resp = self.core.webhdfs_get_file_status("/").await?;
234        match resp.status() {
235            StatusCode::OK => {
236                let bs = resp.into_body();
237
238                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
239                    .map_err(new_json_deserialize_error)?
240                    .file_status;
241
242                if file_status.ty == FileStatusType::File {
243                    return Err(Error::new(
244                        ErrorKind::ConfigInvalid,
245                        "root path must be dir",
246                    ));
247                }
248            }
249            StatusCode::NOT_FOUND => {
250                self.create_dir("/", OpCreateDir::new()).await?;
251            }
252            _ => return Err(parse_error(resp)),
253        }
254        Ok(())
255    }
256}
257
258impl Access for WebhdfsBackend {
259    type Reader = HttpBody;
260    type Writer = WebhdfsWriters;
261    type Lister = oio::PageLister<WebhdfsLister>;
262    type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
263
264    fn info(&self) -> Arc<AccessorInfo> {
265        self.core.info.clone()
266    }
267
268    /// Create a file or directory
269    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
270        let resp = self.core.webhdfs_create_dir(path).await?;
271
272        let status = resp.status();
273        // WebHDFS's has a two-step create/append to prevent clients to send out
274        // data before creating it.
275        // According to the redirect policy of `reqwest` HTTP Client we are using,
276        // the redirection should be done automatically.
277        match status {
278            StatusCode::CREATED | StatusCode::OK => {
279                let bs = resp.into_body();
280
281                let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
282                    .map_err(new_json_deserialize_error)?;
283
284                if resp.boolean {
285                    Ok(RpCreateDir::default())
286                } else {
287                    Err(Error::new(
288                        ErrorKind::Unexpected,
289                        "webhdfs create dir failed",
290                    ))
291                }
292            }
293            _ => Err(parse_error(resp)),
294        }
295    }
296
297    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
298        // if root exists and is a directory, stat will be ok
299        self.core
300            .root_checker
301            .get_or_try_init(|| async { self.check_root().await })
302            .await?;
303
304        let resp = self.core.webhdfs_get_file_status(path).await?;
305        let status = resp.status();
306        match status {
307            StatusCode::OK => {
308                let bs = resp.into_body();
309
310                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
311                    .map_err(new_json_deserialize_error)?
312                    .file_status;
313
314                let meta = match file_status.ty {
315                    FileStatusType::Directory => Metadata::new(EntryMode::DIR),
316                    FileStatusType::File => Metadata::new(EntryMode::FILE)
317                        .with_content_length(file_status.length)
318                        .with_last_modified(parse_datetime_from_from_timestamp_millis(
319                            file_status.modification_time,
320                        )?),
321                };
322
323                Ok(RpStat::new(meta))
324            }
325
326            _ => Err(parse_error(resp)),
327        }
328    }
329
330    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
331        let resp = self.core.webhdfs_read_file(path, args.range()).await?;
332
333        let status = resp.status();
334        match status {
335            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
336                Ok((RpRead::default(), resp.into_body()))
337            }
338            _ => {
339                let (part, mut body) = resp.into_parts();
340                let buf = body.to_buffer().await?;
341                Err(parse_error(Response::from_parts(part, buf)))
342            }
343        }
344    }
345
346    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
347        let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
348
349        let w = if args.append() {
350            WebhdfsWriters::Two(oio::AppendWriter::new(w))
351        } else {
352            WebhdfsWriters::One(oio::BlockWriter::new(
353                self.info().clone(),
354                w,
355                args.concurrent(),
356            ))
357        };
358
359        Ok((RpWrite::default(), w))
360    }
361
362    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
363        Ok((
364            RpDelete::default(),
365            oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
366        ))
367    }
368
369    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
370        if args.recursive() {
371            return Err(Error::new(
372                ErrorKind::Unsupported,
373                "WebHDFS doesn't support list with recursive",
374            ));
375        }
376
377        let path = path.trim_end_matches('/');
378        let l = WebhdfsLister::new(self.core.clone(), path);
379        Ok((RpList::default(), oio::PageLister::new(l)))
380    }
381}