opendal/services/alluxio/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use serde::Deserialize;
27use serde::Serialize;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33#[derive(Clone)]
35pub struct AlluxioCore {
36 pub info: Arc<AccessorInfo>,
37 pub root: String,
39 pub endpoint: String,
41}
42
43impl Debug for AlluxioCore {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("Backend")
46 .field("root", &self.root)
47 .field("endpoint", &self.endpoint)
48 .finish_non_exhaustive()
49 }
50}
51
52impl AlluxioCore {
53 pub async fn create_dir(&self, path: &str) -> Result<()> {
54 let path = build_rooted_abs_path(&self.root, path);
55
56 let r = CreateDirRequest {
57 recursive: Some(true),
58 allow_exists: Some(true),
59 };
60
61 let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
62 let body = bytes::Bytes::from(body);
63
64 let mut req = Request::post(format!(
65 "{}/api/v1/paths/{}/create-directory",
66 self.endpoint,
67 percent_encode_path(&path)
68 ));
69
70 req = req.header("Content-Type", "application/json");
71
72 let req = req.extension(Operation::CreateDir);
73
74 let req = req
75 .body(Buffer::from(body))
76 .map_err(new_request_build_error)?;
77
78 let resp = self.info.http_client().send(req).await?;
79
80 let status = resp.status();
81 match status {
82 StatusCode::OK => Ok(()),
83 _ => Err(parse_error(resp)),
84 }
85 }
86
87 pub async fn create_file(&self, path: &str) -> Result<u64> {
88 let path = build_rooted_abs_path(&self.root, path);
89
90 let r = CreateFileRequest {
91 recursive: Some(true),
92 };
93
94 let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
95 let body = bytes::Bytes::from(body);
96 let mut req = Request::post(format!(
97 "{}/api/v1/paths/{}/create-file",
98 self.endpoint,
99 percent_encode_path(&path)
100 ));
101
102 req = req.header("Content-Type", "application/json");
103
104 let req = req.extension(Operation::Write);
105
106 let req = req
107 .body(Buffer::from(body))
108 .map_err(new_request_build_error)?;
109
110 let resp = self.info.http_client().send(req).await?;
111 let status = resp.status();
112
113 match status {
114 StatusCode::OK => {
115 let body = resp.into_body();
116 let steam_id: u64 =
117 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
118 Ok(steam_id)
119 }
120 _ => Err(parse_error(resp)),
121 }
122 }
123
124 pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
125 let path = build_rooted_abs_path(&self.root, path);
126
127 let req = Request::post(format!(
128 "{}/api/v1/paths/{}/open-file",
129 self.endpoint,
130 percent_encode_path(&path)
131 ));
132
133 let req = req.extension(Operation::Read);
134
135 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
136 let resp = self.info.http_client().send(req).await?;
137
138 let status = resp.status();
139
140 match status {
141 StatusCode::OK => {
142 let body = resp.into_body();
143 let steam_id: u64 =
144 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
145 Ok(steam_id)
146 }
147 _ => Err(parse_error(resp)),
148 }
149 }
150
151 pub(super) async fn delete(&self, path: &str) -> Result<()> {
152 let path = build_rooted_abs_path(&self.root, path);
153
154 let req = Request::post(format!(
155 "{}/api/v1/paths/{}/delete",
156 self.endpoint,
157 percent_encode_path(&path)
158 ));
159
160 let req = req.extension(Operation::Delete);
161
162 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
163 let resp = self.info.http_client().send(req).await?;
164
165 let status = resp.status();
166
167 match status {
168 StatusCode::OK => Ok(()),
169 _ => {
170 let err = parse_error(resp);
171 if err.kind() == ErrorKind::NotFound {
172 return Ok(());
173 }
174 Err(err)
175 }
176 }
177 }
178
179 pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
180 let path = build_rooted_abs_path(&self.root, path);
181 let dst = build_rooted_abs_path(&self.root, dst);
182
183 let req = Request::post(format!(
184 "{}/api/v1/paths/{}/rename?dst={}",
185 self.endpoint,
186 percent_encode_path(&path),
187 percent_encode_path(&dst)
188 ));
189
190 let req = req.extension(Operation::Rename);
191
192 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
193
194 let resp = self.info.http_client().send(req).await?;
195
196 let status = resp.status();
197
198 match status {
199 StatusCode::OK => Ok(()),
200 _ => Err(parse_error(resp)),
201 }
202 }
203
204 pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
205 let path = build_rooted_abs_path(&self.root, path);
206
207 let req = Request::post(format!(
208 "{}/api/v1/paths/{}/get-status",
209 self.endpoint,
210 percent_encode_path(&path)
211 ));
212
213 let req = req.extension(Operation::Stat);
214
215 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
216
217 let resp = self.info.http_client().send(req).await?;
218
219 let status = resp.status();
220
221 match status {
222 StatusCode::OK => {
223 let body = resp.into_body();
224 let file_info: FileInfo =
225 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
226 Ok(file_info)
227 }
228 _ => Err(parse_error(resp)),
229 }
230 }
231
232 pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
233 let path = build_rooted_abs_path(&self.root, path);
234
235 let req = Request::post(format!(
236 "{}/api/v1/paths/{}/list-status",
237 self.endpoint,
238 percent_encode_path(&path)
239 ));
240
241 let req = req.extension(Operation::List);
242
243 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
244
245 let resp = self.info.http_client().send(req).await?;
246
247 let status = resp.status();
248
249 match status {
250 StatusCode::OK => {
251 let body = resp.into_body();
252 let file_infos: Vec<FileInfo> =
253 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
254 Ok(file_infos)
255 }
256 _ => Err(parse_error(resp)),
257 }
258 }
259
260 pub async fn read(&self, stream_id: u64, _: BytesRange) -> Result<Response<HttpBody>> {
264 let req = Request::post(format!(
265 "{}/api/v1/streams/{}/read",
266 self.endpoint, stream_id,
267 ));
268
269 let req = req.extension(Operation::Read);
270
271 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
272
273 self.info.http_client().fetch(req).await
274 }
275
276 pub(super) async fn write(&self, stream_id: u64, body: Buffer) -> Result<usize> {
277 let req = Request::post(format!(
278 "{}/api/v1/streams/{}/write",
279 self.endpoint, stream_id
280 ));
281
282 let req = req.extension(Operation::Write);
283
284 let req = req.body(body).map_err(new_request_build_error)?;
285
286 let resp = self.info.http_client().send(req).await?;
287
288 let status = resp.status();
289
290 match status {
291 StatusCode::OK => {
292 let body = resp.into_body();
293 let size: usize =
294 serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
295 Ok(size)
296 }
297 _ => Err(parse_error(resp)),
298 }
299 }
300
301 pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
302 let req = Request::post(format!(
303 "{}/api/v1/streams/{}/close",
304 self.endpoint, stream_id
305 ));
306
307 let req = req.extension(Operation::Write);
308
309 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
310
311 let resp = self.info.http_client().send(req).await?;
312
313 let status = resp.status();
314
315 match status {
316 StatusCode::OK => Ok(()),
317 _ => Err(parse_error(resp)),
318 }
319 }
320}
321
322#[derive(Debug, Serialize)]
323struct CreateFileRequest {
324 #[serde(skip_serializing_if = "Option::is_none")]
325 recursive: Option<bool>,
326}
327
328#[derive(Debug, Serialize)]
329#[serde(rename_all = "camelCase")]
330struct CreateDirRequest {
331 #[serde(skip_serializing_if = "Option::is_none")]
332 recursive: Option<bool>,
333 #[serde(skip_serializing_if = "Option::is_none")]
334 allow_exists: Option<bool>,
335}
336
337#[derive(Debug, Deserialize)]
339#[serde(rename_all = "camelCase")]
340pub(super) struct FileInfo {
341 pub path: String,
343 pub last_modification_time_ms: i64,
345 pub folder: bool,
347 pub length: u64,
349}
350
351impl TryFrom<FileInfo> for Metadata {
352 type Error = Error;
353
354 fn try_from(file_info: FileInfo) -> Result<Metadata> {
355 let mut metadata = if file_info.folder {
356 Metadata::new(EntryMode::DIR)
357 } else {
358 Metadata::new(EntryMode::FILE)
359 };
360 metadata
361 .set_content_length(file_info.length)
362 .set_last_modified(parse_datetime_from_from_timestamp_millis(
363 file_info.last_modification_time_ms,
364 )?);
365 Ok(metadata)
366 }
367}