opendal/services/hdfs/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use serde::Deserialize;
21use serde::Serialize;
22
23use super::HDFS_SCHEME;
24use super::backend::HdfsBuilder;
25
26/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) support.
27///
28/// Config for Hdfs services support.
29#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
30#[serde(default)]
31#[non_exhaustive]
32pub struct HdfsConfig {
33    /// work dir of this backend
34    pub root: Option<String>,
35    /// name node of this backend
36    pub name_node: Option<String>,
37    /// kerberos_ticket_cache_path of this backend
38    pub kerberos_ticket_cache_path: Option<String>,
39    /// user of this backend
40    pub user: Option<String>,
41    /// enable the append capacity
42    pub enable_append: bool,
43    /// atomic_write_dir of this backend
44    pub atomic_write_dir: Option<String>,
45}
46
47impl Debug for HdfsConfig {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("HdfsConfig")
50            .field("root", &self.root)
51            .field("name_node", &self.name_node)
52            .field(
53                "kerberos_ticket_cache_path",
54                &self.kerberos_ticket_cache_path,
55            )
56            .field("user", &self.user)
57            .field("enable_append", &self.enable_append)
58            .field("atomic_write_dir", &self.atomic_write_dir)
59            .finish_non_exhaustive()
60    }
61}
62
63impl crate::Configurator for HdfsConfig {
64    type Builder = HdfsBuilder;
65
66    fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
67        let authority = uri.authority().ok_or_else(|| {
68            crate::Error::new(crate::ErrorKind::ConfigInvalid, "uri authority is required")
69                .with_context("service", HDFS_SCHEME)
70        })?;
71
72        let mut map = uri.options().clone();
73        map.insert("name_node".to_string(), format!("hdfs://{authority}"));
74
75        if let Some(root) = uri.root() {
76            if !root.is_empty() {
77                map.insert("root".to_string(), root.to_string());
78            }
79        }
80
81        Self::from_iter(map)
82    }
83
84    fn into_builder(self) -> Self::Builder {
85        HdfsBuilder { config: self }
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use crate::Configurator;
93    use crate::types::OperatorUri;
94
95    #[test]
96    fn from_uri_sets_name_node_and_root() {
97        let uri = OperatorUri::new(
98            "hdfs://cluster.local:8020/user/data",
99            Vec::<(String, String)>::new(),
100        )
101        .unwrap();
102
103        let cfg = HdfsConfig::from_uri(&uri).unwrap();
104        assert_eq!(cfg.name_node.as_deref(), Some("hdfs://cluster.local:8020"));
105        assert_eq!(cfg.root.as_deref(), Some("user/data"));
106    }
107}