opendal_core/services/hdfs_native/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use hdfs_native::HdfsError;
22use hdfs_native::WriteOptions;
23
24use super::error::parse_hdfs_error;
25use crate::raw::*;
26use crate::*;
27
28#[derive(Clone)]
30pub struct HdfsNativeCore {
31 pub info: Arc<AccessorInfo>,
32 pub root: String,
33 pub client: Arc<hdfs_native::Client>,
34 pub enable_append: bool,
35}
36
37impl Debug for HdfsNativeCore {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("HdfsNativeCore")
40 .field("root", &self.root)
41 .field("enable_append", &self.enable_append)
42 .finish_non_exhaustive()
43 }
44}
45
46impl HdfsNativeCore {
47 pub async fn hdfs_create_dir(&self, path: &str) -> Result<()> {
48 let p = build_rooted_abs_path(&self.root, path);
49
50 self.client
51 .mkdirs(&p, 0o777, true)
52 .await
53 .map_err(parse_hdfs_error)?;
54
55 Ok(())
56 }
57
58 pub async fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
59 let p = build_rooted_abs_path(&self.root, path);
60
61 let status: hdfs_native::client::FileStatus = self
62 .client
63 .get_file_info(&p)
64 .await
65 .map_err(parse_hdfs_error)?;
66
67 let mode = if status.isdir {
68 EntryMode::DIR
69 } else {
70 EntryMode::FILE
71 };
72
73 let mut metadata = Metadata::new(mode);
74 metadata
75 .set_last_modified(Timestamp::from_millisecond(
76 status.modification_time as i64,
77 )?)
78 .set_content_length(status.length as u64);
79
80 Ok(metadata)
81 }
82
83 pub async fn hdfs_read(
84 &self,
85 path: &str,
86 args: &OpRead,
87 ) -> Result<(hdfs_native::file::FileReader, u64, u64)> {
88 let p = build_rooted_abs_path(&self.root, path);
89
90 let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
91
92 let offset = args.range().offset();
93 let size = args.range().size().unwrap_or(u64::MAX);
94
95 Ok((f, offset, size))
96 }
97
98 pub async fn hdfs_write(
99 &self,
100 path: &str,
101 args: &OpWrite,
102 ) -> Result<(hdfs_native::file::FileWriter, u64)> {
103 let target_path = build_rooted_abs_path(&self.root, path);
104 let mut initial_size = 0;
105
106 let target_exists = match self.client.get_file_info(&target_path).await {
107 Ok(status) => {
108 initial_size = status.length as u64;
109 true
110 }
111 Err(err) => match &err {
112 HdfsError::FileNotFound(_) => false,
113 _ => return Err(parse_hdfs_error(err)),
114 },
115 };
116
117 let f = if target_exists {
118 if args.append() {
119 assert!(self.enable_append, "append is not enabled");
120 self.client
121 .append(&target_path)
122 .await
123 .map_err(parse_hdfs_error)?
124 } else {
125 initial_size = 0;
126 self.client
127 .create(&target_path, WriteOptions::default().overwrite(true))
128 .await
129 .map_err(parse_hdfs_error)?
130 }
131 } else {
132 initial_size = 0;
133 self.client
134 .create(&target_path, WriteOptions::default())
135 .await
136 .map_err(parse_hdfs_error)?
137 };
138
139 Ok((f, initial_size))
140 }
141
142 pub async fn hdfs_delete(&self, path: &str) -> Result<()> {
143 let p = build_rooted_abs_path(&self.root, path);
144
145 self.client
146 .delete(&p, true)
147 .await
148 .map_err(parse_hdfs_error)?;
149
150 Ok(())
151 }
152
153 pub async fn hdfs_list(&self, path: &str) -> Result<Option<(String, Option<String>)>> {
154 let p: String = build_rooted_abs_path(&self.root, path);
155
156 let isdir = match self.client.get_file_info(&p).await {
157 Ok(status) => status.isdir,
158 Err(err) => {
159 return match &err {
160 HdfsError::FileNotFound(_) => Ok(None),
161 _ => Err(parse_hdfs_error(err)),
162 };
163 }
164 };
165
166 let current_path = if isdir {
167 if !path.ends_with("/") {
168 Some(path.to_string() + "/")
169 } else {
170 Some(path.to_string())
171 }
172 } else {
173 None
174 };
175
176 Ok(Some((p, current_path)))
177 }
178
179 pub async fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
180 let from_path = build_rooted_abs_path(&self.root, from);
181 let to_path = build_rooted_abs_path(&self.root, to);
182
183 match self.client.get_file_info(&to_path).await {
184 Ok(status) => {
185 if status.isdir {
186 return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
187 .with_context("input", &to_path));
188 } else {
189 self.client
190 .delete(&to_path, true)
191 .await
192 .map_err(parse_hdfs_error)?;
193 }
194 }
195 Err(err) => match &err {
196 HdfsError::FileNotFound(_) => {
197 self.client
198 .create(&to_path, WriteOptions::default().create_parent(true))
199 .await
200 .map_err(parse_hdfs_error)?;
201 }
202 _ => return Err(parse_hdfs_error(err)),
203 },
204 };
205
206 self.client
207 .rename(&from_path, &to_path, true)
208 .await
209 .map_err(parse_hdfs_error)?;
210
211 Ok(())
212 }
213}