opendal/services/hdfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::io;
21use std::sync::Arc;
22
23use log::debug;
24
25use super::HDFS_SCHEME;
26use super::core::HdfsCore;
27use super::delete::HdfsDeleter;
28use super::lister::HdfsLister;
29use super::reader::HdfsReader;
30use super::writer::HdfsWriter;
31use crate::raw::*;
32use crate::services::HdfsConfig;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
36#[derive(Default)]
37pub struct HdfsBuilder {
38 pub(super) config: HdfsConfig,
39}
40
41impl Debug for HdfsBuilder {
42 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("HdfsBuilder")
44 .field("config", &self.config)
45 .finish()
46 }
47}
48
49impl HdfsBuilder {
50 pub fn root(mut self, root: &str) -> Self {
54 self.config.root = if root.is_empty() {
55 None
56 } else {
57 Some(root.to_string())
58 };
59
60 self
61 }
62
63 pub fn name_node(mut self, name_node: &str) -> Self {
70 if !name_node.is_empty() {
71 self.config.name_node = Some(name_node.to_string())
72 }
73
74 self
75 }
76
77 pub fn kerberos_ticket_cache_path(mut self, kerberos_ticket_cache_path: &str) -> Self {
81 if !kerberos_ticket_cache_path.is_empty() {
82 self.config.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string())
83 }
84 self
85 }
86
87 pub fn user(mut self, user: &str) -> Self {
89 if !user.is_empty() {
90 self.config.user = Some(user.to_string())
91 }
92 self
93 }
94
95 pub fn enable_append(mut self, enable_append: bool) -> Self {
99 self.config.enable_append = enable_append;
100 self
101 }
102
103 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
110 self.config.atomic_write_dir = if dir.is_empty() {
111 None
112 } else {
113 Some(String::from(dir))
114 };
115 self
116 }
117}
118
119impl Builder for HdfsBuilder {
120 type Config = HdfsConfig;
121
122 fn build(self) -> Result<impl Access> {
123 debug!("backend build started: {:?}", &self);
124
125 let name_node = match &self.config.name_node {
126 Some(v) => v,
127 None => {
128 return Err(Error::new(ErrorKind::ConfigInvalid, "name node is empty")
129 .with_context("service", Scheme::Hdfs));
130 }
131 };
132
133 let root = normalize_root(&self.config.root.unwrap_or_default());
134 debug!("backend use root {root}");
135
136 let mut builder = hdrs::ClientBuilder::new(name_node);
137 if let Some(ticket_cache_path) = &self.config.kerberos_ticket_cache_path {
138 builder = builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
139 }
140 if let Some(user) = &self.config.user {
141 builder = builder.with_user(user.as_str());
142 }
143
144 let client = builder.connect().map_err(new_std_io_error)?;
145
146 if let Err(e) = client.metadata(&root) {
148 if e.kind() == io::ErrorKind::NotFound {
149 debug!("root {root} is not exist, creating now");
150
151 client.create_dir(&root).map_err(new_std_io_error)?
152 }
153 }
154
155 let atomic_write_dir = self.config.atomic_write_dir;
156
157 if let Some(d) = &atomic_write_dir {
159 if let Err(e) = client.metadata(d) {
160 if e.kind() == io::ErrorKind::NotFound {
161 client.create_dir(d).map_err(new_std_io_error)?
162 }
163 }
164 }
165
166 Ok(HdfsBackend {
167 core: Arc::new(HdfsCore {
168 info: {
169 let am = AccessorInfo::default();
170 am.set_scheme(HDFS_SCHEME)
171 .set_root(&root)
172 .set_native_capability(Capability {
173 stat: true,
174
175 read: true,
176
177 write: true,
178 write_can_append: self.config.enable_append,
179
180 create_dir: true,
181 delete: true,
182
183 list: true,
184
185 rename: true,
186
187 shared: true,
188
189 ..Default::default()
190 });
191
192 am.into()
193 },
194 root,
195 atomic_write_dir,
196 client: Arc::new(client),
197 }),
198 })
199 }
200}
201
202#[derive(Debug, Clone)]
204pub struct HdfsBackend {
205 core: Arc<HdfsCore>,
206}
207
208impl Access for HdfsBackend {
209 type Reader = HdfsReader<hdrs::AsyncFile>;
210 type Writer = HdfsWriter<hdrs::AsyncFile>;
211 type Lister = Option<HdfsLister>;
212 type Deleter = oio::OneShotDeleter<HdfsDeleter>;
213
214 fn info(&self) -> Arc<AccessorInfo> {
215 self.core.info.clone()
216 }
217
218 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
219 self.core.hdfs_create_dir(path)?;
220 Ok(RpCreateDir::default())
221 }
222
223 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
224 let m = self.core.hdfs_stat(path)?;
225 Ok(RpStat::new(m))
226 }
227
228 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
229 let f = self.core.hdfs_read(path, &args).await?;
230
231 Ok((
232 RpRead::new(),
233 HdfsReader::new(f, args.range().size().unwrap_or(u64::MAX) as _),
234 ))
235 }
236
237 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
238 let (target_path, tmp_path, f, target_exists, initial_size) =
239 self.core.hdfs_write(path, &op).await?;
240
241 Ok((
242 RpWrite::new(),
243 HdfsWriter::new(
244 target_path,
245 tmp_path,
246 f,
247 Arc::clone(&self.core.client),
248 target_exists,
249 initial_size,
250 ),
251 ))
252 }
253
254 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
255 Ok((
256 RpDelete::default(),
257 oio::OneShotDeleter::new(HdfsDeleter::new(Arc::clone(&self.core))),
258 ))
259 }
260
261 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
262 match self.core.hdfs_list(path)? {
263 Some(f) => {
264 let rd = HdfsLister::new(&self.core.root, f, path);
265 Ok((RpList::default(), Some(rd)))
266 }
267 None => Ok((RpList::default(), None)),
268 }
269 }
270
271 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
272 self.core.hdfs_rename(from, to)?;
273 Ok(RpRename::new())
274 }
275}