opendal/services/alluxio/
core.rs1use bytes::Buf;
19use http::Request;
20use http::Response;
21use http::StatusCode;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Formatter;
26use std::sync::Arc;
27
28use super::error::parse_error;
29use crate::raw::*;
30use crate::*;
31
32#[derive(Clone)]
34pub struct AlluxioCore {
35 pub info: Arc<AccessorInfo>,
36 pub root: String,
38 pub endpoint: String,
40}
41
42impl Debug for AlluxioCore {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("Backend")
45 .field("root", &self.root)
46 .field("endpoint", &self.endpoint)
47 .finish_non_exhaustive()
48 }
49}
50
51impl AlluxioCore {
52 pub async fn create_dir(&self, path: &str) -> Result<()> {
53 let path = build_rooted_abs_path(&self.root, path);
54
55 let r = CreateDirRequest {
56 recursive: Some(true),
57 allow_exists: Some(true),
58 };
59
60 let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
61 let body = bytes::Bytes::from(body);
62
63 let mut req = Request::post(format!(
64 "{}/api/v1/paths/{}/create-directory",
65 self.endpoint,
66 percent_encode_path(&path)
67 ));
68
69 req = req.header("Content-Type", "application/json");
70
71 let req = req
72 .body(Buffer::from(body))
73 .map_err(new_request_build_error)?;
74
75 let resp = self.info.http_client().send(req).await?;
76
77 let status = resp.status();
78 match status {
79 StatusCode::OK => Ok(()),
80 _ => Err(parse_error(resp)),
81 }
82 }
83
84 pub async fn create_file(&self, path: &str) -> Result<u64> {
85 let path = build_rooted_abs_path(&self.root, path);
86
87 let r = CreateFileRequest {
88 recursive: Some(true),
89 };
90
91 let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
92 let body = bytes::Bytes::from(body);
93 let mut req = Request::post(format!(
94 "{}/api/v1/paths/{}/create-file",
95 self.endpoint,
96 percent_encode_path(&path)
97 ));
98
99 req = req.header("Content-Type", "application/json");
100
101 let req = req
102 .body(Buffer::from(body))
103 .map_err(new_request_build_error)?;
104
105 let resp = self.info.http_client().send(req).await?;
106 let status = resp.status();
107
108 match status {
109 StatusCode::OK => {
110 let body = resp.into_body();
111 let steam_id: u64 =
112 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
113 Ok(steam_id)
114 }
115 _ => Err(parse_error(resp)),
116 }
117 }
118
119 pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
120 let path = build_rooted_abs_path(&self.root, path);
121
122 let req = Request::post(format!(
123 "{}/api/v1/paths/{}/open-file",
124 self.endpoint,
125 percent_encode_path(&path)
126 ));
127 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
128 let resp = self.info.http_client().send(req).await?;
129
130 let status = resp.status();
131
132 match status {
133 StatusCode::OK => {
134 let body = resp.into_body();
135 let steam_id: u64 =
136 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
137 Ok(steam_id)
138 }
139 _ => Err(parse_error(resp)),
140 }
141 }
142
143 pub(super) async fn delete(&self, path: &str) -> Result<()> {
144 let path = build_rooted_abs_path(&self.root, path);
145
146 let req = Request::post(format!(
147 "{}/api/v1/paths/{}/delete",
148 self.endpoint,
149 percent_encode_path(&path)
150 ));
151 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
152 let resp = self.info.http_client().send(req).await?;
153
154 let status = resp.status();
155
156 match status {
157 StatusCode::OK => Ok(()),
158 _ => {
159 let err = parse_error(resp);
160 if err.kind() == ErrorKind::NotFound {
161 return Ok(());
162 }
163 Err(err)
164 }
165 }
166 }
167
168 pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
169 let path = build_rooted_abs_path(&self.root, path);
170 let dst = build_rooted_abs_path(&self.root, dst);
171
172 let req = Request::post(format!(
173 "{}/api/v1/paths/{}/rename?dst={}",
174 self.endpoint,
175 percent_encode_path(&path),
176 percent_encode_path(&dst)
177 ));
178
179 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
180
181 let resp = self.info.http_client().send(req).await?;
182
183 let status = resp.status();
184
185 match status {
186 StatusCode::OK => Ok(()),
187 _ => Err(parse_error(resp)),
188 }
189 }
190
191 pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
192 let path = build_rooted_abs_path(&self.root, path);
193
194 let req = Request::post(format!(
195 "{}/api/v1/paths/{}/get-status",
196 self.endpoint,
197 percent_encode_path(&path)
198 ));
199
200 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
201
202 let resp = self.info.http_client().send(req).await?;
203
204 let status = resp.status();
205
206 match status {
207 StatusCode::OK => {
208 let body = resp.into_body();
209 let file_info: FileInfo =
210 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
211 Ok(file_info)
212 }
213 _ => Err(parse_error(resp)),
214 }
215 }
216
217 pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
218 let path = build_rooted_abs_path(&self.root, path);
219
220 let req = Request::post(format!(
221 "{}/api/v1/paths/{}/list-status",
222 self.endpoint,
223 percent_encode_path(&path)
224 ));
225
226 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
227
228 let resp = self.info.http_client().send(req).await?;
229
230 let status = resp.status();
231
232 match status {
233 StatusCode::OK => {
234 let body = resp.into_body();
235 let file_infos: Vec<FileInfo> =
236 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
237 Ok(file_infos)
238 }
239 _ => Err(parse_error(resp)),
240 }
241 }
242
243 pub async fn read(&self, stream_id: u64, _: BytesRange) -> Result<Response<HttpBody>> {
247 let req = Request::post(format!(
248 "{}/api/v1/streams/{}/read",
249 self.endpoint, stream_id,
250 ));
251
252 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
253
254 self.info.http_client().fetch(req).await
255 }
256
257 pub(super) async fn write(&self, stream_id: u64, body: Buffer) -> Result<usize> {
258 let req = Request::post(format!(
259 "{}/api/v1/streams/{}/write",
260 self.endpoint, stream_id
261 ));
262 let req = req.body(body).map_err(new_request_build_error)?;
263
264 let resp = self.info.http_client().send(req).await?;
265
266 let status = resp.status();
267
268 match status {
269 StatusCode::OK => {
270 let body = resp.into_body();
271 let size: usize =
272 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
273 Ok(size)
274 }
275 _ => Err(parse_error(resp)),
276 }
277 }
278
279 pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
280 let req = Request::post(format!(
281 "{}/api/v1/streams/{}/close",
282 self.endpoint, stream_id
283 ));
284 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
285
286 let resp = self.info.http_client().send(req).await?;
287
288 let status = resp.status();
289
290 match status {
291 StatusCode::OK => Ok(()),
292 _ => Err(parse_error(resp)),
293 }
294 }
295}
296
297#[derive(Debug, Serialize)]
298struct CreateFileRequest {
299 #[serde(skip_serializing_if = "Option::is_none")]
300 recursive: Option<bool>,
301}
302
303#[derive(Debug, Serialize)]
304#[serde(rename_all = "camelCase")]
305struct CreateDirRequest {
306 #[serde(skip_serializing_if = "Option::is_none")]
307 recursive: Option<bool>,
308 #[serde(skip_serializing_if = "Option::is_none")]
309 allow_exists: Option<bool>,
310}
311
312#[derive(Debug, Deserialize)]
314#[serde(rename_all = "camelCase")]
315pub(super) struct FileInfo {
316 pub path: String,
318 pub last_modification_time_ms: i64,
320 pub folder: bool,
322 pub length: u64,
324}
325
326impl TryFrom<FileInfo> for Metadata {
327 type Error = Error;
328
329 fn try_from(file_info: FileInfo) -> Result<Metadata> {
330 let mut metadata = if file_info.folder {
331 Metadata::new(EntryMode::DIR)
332 } else {
333 Metadata::new(EntryMode::FILE)
334 };
335 metadata
336 .set_content_length(file_info.length)
337 .set_last_modified(parse_datetime_from_from_timestamp_millis(
338 file_info.last_modification_time_ms,
339 )?);
340 Ok(metadata)
341 }
342}