opendal/services/webhdfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use tokio::sync::OnceCell;
26
27use super::WEBHDFS_SCHEME;
28use super::config::WebhdfsConfig;
29use super::core::WebhdfsCore;
30use super::deleter::WebhdfsDeleter;
31use super::error::parse_error;
32use super::lister::WebhdfsLister;
33use super::message::BooleanResp;
34use super::message::FileStatusType;
35use super::message::FileStatusWrapper;
36use super::writer::WebhdfsWriter;
37use super::writer::WebhdfsWriters;
38use crate::raw::*;
39use crate::*;
40
41const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870";
42
43/// [WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)'s REST API support.
44#[doc = include_str!("docs.md")]
45#[derive(Debug, Default)]
46pub struct WebhdfsBuilder {
47    pub(super) config: WebhdfsConfig,
48}
49
50impl WebhdfsBuilder {
51    /// Set the working directory of this backend
52    ///
53    /// All operations will happen under this root
54    ///
55    /// # Note
56    ///
57    /// The root will be automatically created if not exists.
58    pub fn root(mut self, root: &str) -> Self {
59        self.config.root = if root.is_empty() {
60            None
61        } else {
62            Some(root.to_string())
63        };
64
65        self
66    }
67
68    /// Set the remote address of this backend
69    /// default to `http://127.0.0.1:9870`
70    ///
71    /// Endpoints should be full uri, e.g.
72    ///
73    /// - `https://webhdfs.example.com:9870`
74    /// - `http://192.168.66.88:9870`
75    ///
76    /// If user inputs endpoint without scheme, we will
77    /// prepend `http://` to it.
78    pub fn endpoint(mut self, endpoint: &str) -> Self {
79        if !endpoint.is_empty() {
80            // trim tailing slash so we can accept `http://127.0.0.1:9870/`
81            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
82        }
83        self
84    }
85
86    /// Set the username of this backend,
87    /// used for authentication
88    pub fn user_name(mut self, user_name: &str) -> Self {
89        if !user_name.is_empty() {
90            self.config.user_name = Some(user_name.to_string());
91        }
92        self
93    }
94
95    /// Set the delegation token of this backend,
96    /// used for authentication
97    ///
98    /// # Note
99    /// The builder prefers using delegation token over username.
100    /// If both are set, delegation token will be used.
101    pub fn delegation(mut self, delegation: &str) -> Self {
102        if !delegation.is_empty() {
103            self.config.delegation = Some(delegation.to_string());
104        }
105        self
106    }
107
108    /// Disable batch listing
109    ///
110    /// # Note
111    ///
112    /// When listing a directory, the backend will default to use batch listing.
113    /// If disabled, the backend will list all files/directories in one request.
114    pub fn disable_list_batch(mut self) -> Self {
115        self.config.disable_list_batch = true;
116        self
117    }
118
119    /// Set temp dir for atomic write.
120    ///
121    /// # Notes
122    ///
123    /// If not set, write multi not support, eg: `.opendal_tmp/`.
124    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
125        self.config.atomic_write_dir = if dir.is_empty() {
126            None
127        } else {
128            Some(String::from(dir))
129        };
130        self
131    }
132}
133
134impl Builder for WebhdfsBuilder {
135    type Config = WebhdfsConfig;
136
137    /// build the backend
138    ///
139    /// # Note
140    ///
141    /// when building backend, the built backend will check if the root directory
142    /// exits.
143    /// if the directory does not exit, the directory will be automatically created
144    fn build(self) -> Result<impl Access> {
145        debug!("start building backend: {self:?}");
146
147        let root = normalize_root(&self.config.root.unwrap_or_default());
148        debug!("backend use root {root}");
149
150        // check scheme
151        let endpoint = match self.config.endpoint {
152            Some(endpoint) => {
153                if endpoint.starts_with("http") {
154                    endpoint
155                } else {
156                    format!("http://{endpoint}")
157                }
158            }
159            None => WEBHDFS_DEFAULT_ENDPOINT.to_string(),
160        };
161        debug!("backend use endpoint {endpoint}");
162
163        let atomic_write_dir = self.config.atomic_write_dir;
164
165        let auth = self.config.delegation.map(|dt| format!("delegation={dt}"));
166
167        let info = AccessorInfo::default();
168        info.set_scheme(WEBHDFS_SCHEME)
169            .set_root(&root)
170            .set_native_capability(Capability {
171                stat: true,
172
173                read: true,
174
175                write: true,
176                write_can_append: true,
177                write_can_multi: atomic_write_dir.is_some(),
178
179                create_dir: true,
180                delete: true,
181
182                list: true,
183
184                shared: true,
185
186                ..Default::default()
187            });
188
189        let accessor_info = Arc::new(info);
190        let core = Arc::new(WebhdfsCore {
191            info: accessor_info,
192            root,
193            endpoint,
194            user_name: self.config.user_name,
195            auth,
196            root_checker: OnceCell::new(),
197            atomic_write_dir,
198            disable_list_batch: self.config.disable_list_batch,
199        });
200
201        Ok(WebhdfsBackend { core })
202    }
203}
204
205/// Backend for WebHDFS service
206#[derive(Debug, Clone)]
207pub struct WebhdfsBackend {
208    core: Arc<WebhdfsCore>,
209}
210
211impl WebhdfsBackend {
212    async fn check_root(&self) -> Result<()> {
213        let resp = self.core.webhdfs_get_file_status("/").await?;
214        match resp.status() {
215            StatusCode::OK => {
216                let bs = resp.into_body();
217
218                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
219                    .map_err(new_json_deserialize_error)?
220                    .file_status;
221
222                if file_status.ty == FileStatusType::File {
223                    return Err(Error::new(
224                        ErrorKind::ConfigInvalid,
225                        "root path must be dir",
226                    ));
227                }
228            }
229            StatusCode::NOT_FOUND => {
230                self.create_dir("/", OpCreateDir::new()).await?;
231            }
232            _ => return Err(parse_error(resp)),
233        }
234        Ok(())
235    }
236}
237
238impl Access for WebhdfsBackend {
239    type Reader = HttpBody;
240    type Writer = WebhdfsWriters;
241    type Lister = oio::PageLister<WebhdfsLister>;
242    type Deleter = oio::OneShotDeleter<WebhdfsDeleter>;
243
244    fn info(&self) -> Arc<AccessorInfo> {
245        self.core.info.clone()
246    }
247
248    /// Create a file or directory
249    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
250        let resp = self.core.webhdfs_create_dir(path).await?;
251
252        let status = resp.status();
253        // WebHDFS's has a two-step create/append to prevent clients to send out
254        // data before creating it.
255        // According to the redirect policy of `reqwest` HTTP Client we are using,
256        // the redirection should be done automatically.
257        match status {
258            StatusCode::CREATED | StatusCode::OK => {
259                let bs = resp.into_body();
260
261                let resp = serde_json::from_reader::<_, BooleanResp>(bs.reader())
262                    .map_err(new_json_deserialize_error)?;
263
264                if resp.boolean {
265                    Ok(RpCreateDir::default())
266                } else {
267                    Err(Error::new(
268                        ErrorKind::Unexpected,
269                        "webhdfs create dir failed",
270                    ))
271                }
272            }
273            _ => Err(parse_error(resp)),
274        }
275    }
276
277    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
278        // if root exists and is a directory, stat will be ok
279        self.core
280            .root_checker
281            .get_or_try_init(|| async { self.check_root().await })
282            .await?;
283
284        let resp = self.core.webhdfs_get_file_status(path).await?;
285        let status = resp.status();
286        match status {
287            StatusCode::OK => {
288                let bs = resp.into_body();
289
290                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
291                    .map_err(new_json_deserialize_error)?
292                    .file_status;
293
294                let meta = match file_status.ty {
295                    FileStatusType::Directory => Metadata::new(EntryMode::DIR),
296                    FileStatusType::File => Metadata::new(EntryMode::FILE)
297                        .with_content_length(file_status.length)
298                        .with_last_modified(Timestamp::from_millisecond(
299                            file_status.modification_time,
300                        )?),
301                };
302
303                Ok(RpStat::new(meta))
304            }
305
306            _ => Err(parse_error(resp)),
307        }
308    }
309
310    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
311        let resp = self.core.webhdfs_read_file(path, args.range()).await?;
312
313        let status = resp.status();
314        match status {
315            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
316                Ok((RpRead::default(), resp.into_body()))
317            }
318            _ => {
319                let (part, mut body) = resp.into_parts();
320                let buf = body.to_buffer().await?;
321                Err(parse_error(Response::from_parts(part, buf)))
322            }
323        }
324    }
325
326    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
327        let w = WebhdfsWriter::new(self.core.clone(), args.clone(), path.to_string());
328
329        let w = if args.append() {
330            WebhdfsWriters::Two(oio::AppendWriter::new(w))
331        } else {
332            WebhdfsWriters::One(oio::BlockWriter::new(
333                self.info().clone(),
334                w,
335                args.concurrent(),
336            ))
337        };
338
339        Ok((RpWrite::default(), w))
340    }
341
342    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
343        Ok((
344            RpDelete::default(),
345            oio::OneShotDeleter::new(WebhdfsDeleter::new(self.core.clone())),
346        ))
347    }
348
349    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
350        if args.recursive() {
351            return Err(Error::new(
352                ErrorKind::Unsupported,
353                "WebHDFS doesn't support list with recursive",
354            ));
355        }
356
357        let path = path.trim_end_matches('/');
358        let l = WebhdfsLister::new(self.core.clone(), path);
359        Ok((RpList::default(), oio::PageLister::new(l)))
360    }
361}