opendal/services/alluxio/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use serde::Deserialize;
27use serde::Serialize;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33/// Alluxio core
34#[derive(Clone)]
35pub struct AlluxioCore {
36    pub info: Arc<AccessorInfo>,
37    /// root of this backend.
38    pub root: String,
39    /// endpoint of alluxio
40    pub endpoint: String,
41}
42
43impl Debug for AlluxioCore {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("Backend")
46            .field("root", &self.root)
47            .field("endpoint", &self.endpoint)
48            .finish_non_exhaustive()
49    }
50}
51
52impl AlluxioCore {
53    pub async fn create_dir(&self, path: &str) -> Result<()> {
54        let path = build_rooted_abs_path(&self.root, path);
55
56        let r = CreateDirRequest {
57            recursive: Some(true),
58            allow_exists: Some(true),
59        };
60
61        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
62        let body = bytes::Bytes::from(body);
63
64        let mut req = Request::post(format!(
65            "{}/api/v1/paths/{}/create-directory",
66            self.endpoint,
67            percent_encode_path(&path)
68        ));
69
70        req = req.header("Content-Type", "application/json");
71
72        let req = req.extension(Operation::CreateDir);
73
74        let req = req
75            .body(Buffer::from(body))
76            .map_err(new_request_build_error)?;
77
78        let resp = self.info.http_client().send(req).await?;
79
80        let status = resp.status();
81        match status {
82            StatusCode::OK => Ok(()),
83            _ => Err(parse_error(resp)),
84        }
85    }
86
87    pub async fn create_file(&self, path: &str) -> Result<u64> {
88        let path = build_rooted_abs_path(&self.root, path);
89
90        let r = CreateFileRequest {
91            recursive: Some(true),
92        };
93
94        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
95        let body = bytes::Bytes::from(body);
96        let mut req = Request::post(format!(
97            "{}/api/v1/paths/{}/create-file",
98            self.endpoint,
99            percent_encode_path(&path)
100        ));
101
102        req = req.header("Content-Type", "application/json");
103
104        let req = req.extension(Operation::Write);
105
106        let req = req
107            .body(Buffer::from(body))
108            .map_err(new_request_build_error)?;
109
110        let resp = self.info.http_client().send(req).await?;
111        let status = resp.status();
112
113        match status {
114            StatusCode::OK => {
115                let body = resp.into_body();
116                let steam_id: u64 =
117                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
118                Ok(steam_id)
119            }
120            _ => Err(parse_error(resp)),
121        }
122    }
123
124    pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
125        let path = build_rooted_abs_path(&self.root, path);
126
127        let req = Request::post(format!(
128            "{}/api/v1/paths/{}/open-file",
129            self.endpoint,
130            percent_encode_path(&path)
131        ));
132
133        let req = req.extension(Operation::Read);
134
135        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
136        let resp = self.info.http_client().send(req).await?;
137
138        let status = resp.status();
139
140        match status {
141            StatusCode::OK => {
142                let body = resp.into_body();
143                let steam_id: u64 =
144                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
145                Ok(steam_id)
146            }
147            _ => Err(parse_error(resp)),
148        }
149    }
150
151    pub(super) async fn delete(&self, path: &str) -> Result<()> {
152        let path = build_rooted_abs_path(&self.root, path);
153
154        let req = Request::post(format!(
155            "{}/api/v1/paths/{}/delete",
156            self.endpoint,
157            percent_encode_path(&path)
158        ));
159
160        let req = req.extension(Operation::Delete);
161
162        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
163        let resp = self.info.http_client().send(req).await?;
164
165        let status = resp.status();
166
167        match status {
168            StatusCode::OK => Ok(()),
169            _ => {
170                let err = parse_error(resp);
171                if err.kind() == ErrorKind::NotFound {
172                    return Ok(());
173                }
174                Err(err)
175            }
176        }
177    }
178
179    pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
180        let path = build_rooted_abs_path(&self.root, path);
181        let dst = build_rooted_abs_path(&self.root, dst);
182
183        let req = Request::post(format!(
184            "{}/api/v1/paths/{}/rename?dst={}",
185            self.endpoint,
186            percent_encode_path(&path),
187            percent_encode_path(&dst)
188        ));
189
190        let req = req.extension(Operation::Rename);
191
192        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
193
194        let resp = self.info.http_client().send(req).await?;
195
196        let status = resp.status();
197
198        match status {
199            StatusCode::OK => Ok(()),
200            _ => Err(parse_error(resp)),
201        }
202    }
203
204    pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
205        let path = build_rooted_abs_path(&self.root, path);
206
207        let req = Request::post(format!(
208            "{}/api/v1/paths/{}/get-status",
209            self.endpoint,
210            percent_encode_path(&path)
211        ));
212
213        let req = req.extension(Operation::Stat);
214
215        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
216
217        let resp = self.info.http_client().send(req).await?;
218
219        let status = resp.status();
220
221        match status {
222            StatusCode::OK => {
223                let body = resp.into_body();
224                let file_info: FileInfo =
225                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
226                Ok(file_info)
227            }
228            _ => Err(parse_error(resp)),
229        }
230    }
231
232    pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
233        let path = build_rooted_abs_path(&self.root, path);
234
235        let req = Request::post(format!(
236            "{}/api/v1/paths/{}/list-status",
237            self.endpoint,
238            percent_encode_path(&path)
239        ));
240
241        let req = req.extension(Operation::List);
242
243        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
244
245        let resp = self.info.http_client().send(req).await?;
246
247        let status = resp.status();
248
249        match status {
250            StatusCode::OK => {
251                let body = resp.into_body();
252                let file_infos: Vec<FileInfo> =
253                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
254                Ok(file_infos)
255            }
256            _ => Err(parse_error(resp)),
257        }
258    }
259
260    /// TODO: we should implement range support correctly.
261    ///
262    /// Please refer to [alluxio-py](https://github.com/Alluxio/alluxio-py/blob/main/alluxio/const.py#L18)
263    pub async fn read(&self, stream_id: u64, _: BytesRange) -> Result<Response<HttpBody>> {
264        let req = Request::post(format!(
265            "{}/api/v1/streams/{}/read",
266            self.endpoint, stream_id,
267        ));
268
269        let req = req.extension(Operation::Read);
270
271        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
272
273        self.info.http_client().fetch(req).await
274    }
275
276    pub(super) async fn write(&self, stream_id: u64, body: Buffer) -> Result<usize> {
277        let req = Request::post(format!(
278            "{}/api/v1/streams/{}/write",
279            self.endpoint, stream_id
280        ));
281
282        let req = req.extension(Operation::Write);
283
284        let req = req.body(body).map_err(new_request_build_error)?;
285
286        let resp = self.info.http_client().send(req).await?;
287
288        let status = resp.status();
289
290        match status {
291            StatusCode::OK => {
292                let body = resp.into_body();
293                let size: usize =
294                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
295                Ok(size)
296            }
297            _ => Err(parse_error(resp)),
298        }
299    }
300
301    pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
302        let req = Request::post(format!(
303            "{}/api/v1/streams/{}/close",
304            self.endpoint, stream_id
305        ));
306
307        let req = req.extension(Operation::Write);
308
309        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
310
311        let resp = self.info.http_client().send(req).await?;
312
313        let status = resp.status();
314
315        match status {
316            StatusCode::OK => Ok(()),
317            _ => Err(parse_error(resp)),
318        }
319    }
320}
321
322#[derive(Debug, Serialize)]
323struct CreateFileRequest {
324    #[serde(skip_serializing_if = "Option::is_none")]
325    recursive: Option<bool>,
326}
327
328#[derive(Debug, Serialize)]
329#[serde(rename_all = "camelCase")]
330struct CreateDirRequest {
331    #[serde(skip_serializing_if = "Option::is_none")]
332    recursive: Option<bool>,
333    #[serde(skip_serializing_if = "Option::is_none")]
334    allow_exists: Option<bool>,
335}
336
337/// Metadata of alluxio object
338#[derive(Debug, Deserialize)]
339#[serde(rename_all = "camelCase")]
340pub(super) struct FileInfo {
341    /// The path of the object
342    pub path: String,
343    /// The last modification time of the object
344    pub last_modification_time_ms: i64,
345    /// Whether the object is a folder
346    pub folder: bool,
347    /// The length of the object in bytes
348    pub length: u64,
349}
350
351impl TryFrom<FileInfo> for Metadata {
352    type Error = Error;
353
354    fn try_from(file_info: FileInfo) -> Result<Metadata> {
355        let mut metadata = if file_info.folder {
356            Metadata::new(EntryMode::DIR)
357        } else {
358            Metadata::new(EntryMode::FILE)
359        };
360        metadata
361            .set_content_length(file_info.length)
362            .set_last_modified(parse_datetime_from_from_timestamp_millis(
363                file_info.last_modification_time_ms,
364            )?);
365        Ok(metadata)
366    }
367}