opendal/services/alluxio/
core.rs1use bytes::Buf;
19use http::Request;
20use http::Response;
21use http::StatusCode;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Formatter;
26use std::sync::Arc;
27
28use super::error::parse_error;
29use crate::raw::*;
30use crate::*;
31
32#[derive(Clone)]
34pub struct AlluxioCore {
35 pub info: Arc<AccessorInfo>,
36 pub root: String,
38 pub endpoint: String,
40}
41
42impl Debug for AlluxioCore {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("Backend")
45 .field("root", &self.root)
46 .field("endpoint", &self.endpoint)
47 .finish_non_exhaustive()
48 }
49}
50
51impl AlluxioCore {
52 pub async fn create_dir(&self, path: &str) -> Result<()> {
53 let path = build_rooted_abs_path(&self.root, path);
54
55 let r = CreateDirRequest {
56 recursive: Some(true),
57 allow_exists: Some(true),
58 };
59
60 let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
61 let body = bytes::Bytes::from(body);
62
63 let mut req = Request::post(format!(
64 "{}/api/v1/paths/{}/create-directory",
65 self.endpoint,
66 percent_encode_path(&path)
67 ));
68
69 req = req.header("Content-Type", "application/json");
70
71 let req = req.extension(Operation::CreateDir);
72
73 let req = req
74 .body(Buffer::from(body))
75 .map_err(new_request_build_error)?;
76
77 let resp = self.info.http_client().send(req).await?;
78
79 let status = resp.status();
80 match status {
81 StatusCode::OK => Ok(()),
82 _ => Err(parse_error(resp)),
83 }
84 }
85
86 pub async fn create_file(&self, path: &str) -> Result<u64> {
87 let path = build_rooted_abs_path(&self.root, path);
88
89 let r = CreateFileRequest {
90 recursive: Some(true),
91 };
92
93 let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
94 let body = bytes::Bytes::from(body);
95 let mut req = Request::post(format!(
96 "{}/api/v1/paths/{}/create-file",
97 self.endpoint,
98 percent_encode_path(&path)
99 ));
100
101 req = req.header("Content-Type", "application/json");
102
103 let req = req.extension(Operation::Write);
104
105 let req = req
106 .body(Buffer::from(body))
107 .map_err(new_request_build_error)?;
108
109 let resp = self.info.http_client().send(req).await?;
110 let status = resp.status();
111
112 match status {
113 StatusCode::OK => {
114 let body = resp.into_body();
115 let steam_id: u64 =
116 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
117 Ok(steam_id)
118 }
119 _ => Err(parse_error(resp)),
120 }
121 }
122
123 pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
124 let path = build_rooted_abs_path(&self.root, path);
125
126 let req = Request::post(format!(
127 "{}/api/v1/paths/{}/open-file",
128 self.endpoint,
129 percent_encode_path(&path)
130 ));
131
132 let req = req.extension(Operation::Read);
133
134 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
135 let resp = self.info.http_client().send(req).await?;
136
137 let status = resp.status();
138
139 match status {
140 StatusCode::OK => {
141 let body = resp.into_body();
142 let steam_id: u64 =
143 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
144 Ok(steam_id)
145 }
146 _ => Err(parse_error(resp)),
147 }
148 }
149
150 pub(super) async fn delete(&self, path: &str) -> Result<()> {
151 let path = build_rooted_abs_path(&self.root, path);
152
153 let req = Request::post(format!(
154 "{}/api/v1/paths/{}/delete",
155 self.endpoint,
156 percent_encode_path(&path)
157 ));
158
159 let req = req.extension(Operation::Delete);
160
161 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
162 let resp = self.info.http_client().send(req).await?;
163
164 let status = resp.status();
165
166 match status {
167 StatusCode::OK => Ok(()),
168 _ => {
169 let err = parse_error(resp);
170 if err.kind() == ErrorKind::NotFound {
171 return Ok(());
172 }
173 Err(err)
174 }
175 }
176 }
177
178 pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
179 let path = build_rooted_abs_path(&self.root, path);
180 let dst = build_rooted_abs_path(&self.root, dst);
181
182 let req = Request::post(format!(
183 "{}/api/v1/paths/{}/rename?dst={}",
184 self.endpoint,
185 percent_encode_path(&path),
186 percent_encode_path(&dst)
187 ));
188
189 let req = req.extension(Operation::Rename);
190
191 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
192
193 let resp = self.info.http_client().send(req).await?;
194
195 let status = resp.status();
196
197 match status {
198 StatusCode::OK => Ok(()),
199 _ => Err(parse_error(resp)),
200 }
201 }
202
203 pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
204 let path = build_rooted_abs_path(&self.root, path);
205
206 let req = Request::post(format!(
207 "{}/api/v1/paths/{}/get-status",
208 self.endpoint,
209 percent_encode_path(&path)
210 ));
211
212 let req = req.extension(Operation::Stat);
213
214 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
215
216 let resp = self.info.http_client().send(req).await?;
217
218 let status = resp.status();
219
220 match status {
221 StatusCode::OK => {
222 let body = resp.into_body();
223 let file_info: FileInfo =
224 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
225 Ok(file_info)
226 }
227 _ => Err(parse_error(resp)),
228 }
229 }
230
231 pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
232 let path = build_rooted_abs_path(&self.root, path);
233
234 let req = Request::post(format!(
235 "{}/api/v1/paths/{}/list-status",
236 self.endpoint,
237 percent_encode_path(&path)
238 ));
239
240 let req = req.extension(Operation::List);
241
242 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
243
244 let resp = self.info.http_client().send(req).await?;
245
246 let status = resp.status();
247
248 match status {
249 StatusCode::OK => {
250 let body = resp.into_body();
251 let file_infos: Vec<FileInfo> =
252 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
253 Ok(file_infos)
254 }
255 _ => Err(parse_error(resp)),
256 }
257 }
258
259 pub async fn read(&self, stream_id: u64, _: BytesRange) -> Result<Response<HttpBody>> {
263 let req = Request::post(format!(
264 "{}/api/v1/streams/{}/read",
265 self.endpoint, stream_id,
266 ));
267
268 let req = req.extension(Operation::Read);
269
270 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
271
272 self.info.http_client().fetch(req).await
273 }
274
275 pub(super) async fn write(&self, stream_id: u64, body: Buffer) -> Result<usize> {
276 let req = Request::post(format!(
277 "{}/api/v1/streams/{}/write",
278 self.endpoint, stream_id
279 ));
280
281 let req = req.extension(Operation::Write);
282
283 let req = req.body(body).map_err(new_request_build_error)?;
284
285 let resp = self.info.http_client().send(req).await?;
286
287 let status = resp.status();
288
289 match status {
290 StatusCode::OK => {
291 let body = resp.into_body();
292 let size: usize =
293 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
294 Ok(size)
295 }
296 _ => Err(parse_error(resp)),
297 }
298 }
299
300 pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
301 let req = Request::post(format!(
302 "{}/api/v1/streams/{}/close",
303 self.endpoint, stream_id
304 ));
305
306 let req = req.extension(Operation::Write);
307
308 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
309
310 let resp = self.info.http_client().send(req).await?;
311
312 let status = resp.status();
313
314 match status {
315 StatusCode::OK => Ok(()),
316 _ => Err(parse_error(resp)),
317 }
318 }
319}
320
321#[derive(Debug, Serialize)]
322struct CreateFileRequest {
323 #[serde(skip_serializing_if = "Option::is_none")]
324 recursive: Option<bool>,
325}
326
327#[derive(Debug, Serialize)]
328#[serde(rename_all = "camelCase")]
329struct CreateDirRequest {
330 #[serde(skip_serializing_if = "Option::is_none")]
331 recursive: Option<bool>,
332 #[serde(skip_serializing_if = "Option::is_none")]
333 allow_exists: Option<bool>,
334}
335
336#[derive(Debug, Deserialize)]
338#[serde(rename_all = "camelCase")]
339pub(super) struct FileInfo {
340 pub path: String,
342 pub last_modification_time_ms: i64,
344 pub folder: bool,
346 pub length: u64,
348}
349
350impl TryFrom<FileInfo> for Metadata {
351 type Error = Error;
352
353 fn try_from(file_info: FileInfo) -> Result<Metadata> {
354 let mut metadata = if file_info.folder {
355 Metadata::new(EntryMode::DIR)
356 } else {
357 Metadata::new(EntryMode::FILE)
358 };
359 metadata
360 .set_content_length(file_info.length)
361 .set_last_modified(parse_datetime_from_from_timestamp_millis(
362 file_info.last_modification_time_ms,
363 )?);
364 Ok(metadata)
365 }
366}