opendal/services/alluxio/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Buf;
19use http::Request;
20use http::Response;
21use http::StatusCode;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Formatter;
26use std::sync::Arc;
27
28use super::error::parse_error;
29use crate::raw::*;
30use crate::*;
31
32/// Alluxio core
33#[derive(Clone)]
34pub struct AlluxioCore {
35    pub info: Arc<AccessorInfo>,
36    /// root of this backend.
37    pub root: String,
38    /// endpoint of alluxio
39    pub endpoint: String,
40}
41
42impl Debug for AlluxioCore {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("Backend")
45            .field("root", &self.root)
46            .field("endpoint", &self.endpoint)
47            .finish_non_exhaustive()
48    }
49}
50
51impl AlluxioCore {
52    pub async fn create_dir(&self, path: &str) -> Result<()> {
53        let path = build_rooted_abs_path(&self.root, path);
54
55        let r = CreateDirRequest {
56            recursive: Some(true),
57            allow_exists: Some(true),
58        };
59
60        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
61        let body = bytes::Bytes::from(body);
62
63        let mut req = Request::post(format!(
64            "{}/api/v1/paths/{}/create-directory",
65            self.endpoint,
66            percent_encode_path(&path)
67        ));
68
69        req = req.header("Content-Type", "application/json");
70
71        let req = req
72            .body(Buffer::from(body))
73            .map_err(new_request_build_error)?;
74
75        let resp = self.info.http_client().send(req).await?;
76
77        let status = resp.status();
78        match status {
79            StatusCode::OK => Ok(()),
80            _ => Err(parse_error(resp)),
81        }
82    }
83
84    pub async fn create_file(&self, path: &str) -> Result<u64> {
85        let path = build_rooted_abs_path(&self.root, path);
86
87        let r = CreateFileRequest {
88            recursive: Some(true),
89        };
90
91        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
92        let body = bytes::Bytes::from(body);
93        let mut req = Request::post(format!(
94            "{}/api/v1/paths/{}/create-file",
95            self.endpoint,
96            percent_encode_path(&path)
97        ));
98
99        req = req.header("Content-Type", "application/json");
100
101        let req = req
102            .body(Buffer::from(body))
103            .map_err(new_request_build_error)?;
104
105        let resp = self.info.http_client().send(req).await?;
106        let status = resp.status();
107
108        match status {
109            StatusCode::OK => {
110                let body = resp.into_body();
111                let steam_id: u64 =
112                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
113                Ok(steam_id)
114            }
115            _ => Err(parse_error(resp)),
116        }
117    }
118
119    pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
120        let path = build_rooted_abs_path(&self.root, path);
121
122        let req = Request::post(format!(
123            "{}/api/v1/paths/{}/open-file",
124            self.endpoint,
125            percent_encode_path(&path)
126        ));
127        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
128        let resp = self.info.http_client().send(req).await?;
129
130        let status = resp.status();
131
132        match status {
133            StatusCode::OK => {
134                let body = resp.into_body();
135                let steam_id: u64 =
136                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
137                Ok(steam_id)
138            }
139            _ => Err(parse_error(resp)),
140        }
141    }
142
143    pub(super) async fn delete(&self, path: &str) -> Result<()> {
144        let path = build_rooted_abs_path(&self.root, path);
145
146        let req = Request::post(format!(
147            "{}/api/v1/paths/{}/delete",
148            self.endpoint,
149            percent_encode_path(&path)
150        ));
151        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
152        let resp = self.info.http_client().send(req).await?;
153
154        let status = resp.status();
155
156        match status {
157            StatusCode::OK => Ok(()),
158            _ => {
159                let err = parse_error(resp);
160                if err.kind() == ErrorKind::NotFound {
161                    return Ok(());
162                }
163                Err(err)
164            }
165        }
166    }
167
168    pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
169        let path = build_rooted_abs_path(&self.root, path);
170        let dst = build_rooted_abs_path(&self.root, dst);
171
172        let req = Request::post(format!(
173            "{}/api/v1/paths/{}/rename?dst={}",
174            self.endpoint,
175            percent_encode_path(&path),
176            percent_encode_path(&dst)
177        ));
178
179        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
180
181        let resp = self.info.http_client().send(req).await?;
182
183        let status = resp.status();
184
185        match status {
186            StatusCode::OK => Ok(()),
187            _ => Err(parse_error(resp)),
188        }
189    }
190
191    pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
192        let path = build_rooted_abs_path(&self.root, path);
193
194        let req = Request::post(format!(
195            "{}/api/v1/paths/{}/get-status",
196            self.endpoint,
197            percent_encode_path(&path)
198        ));
199
200        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
201
202        let resp = self.info.http_client().send(req).await?;
203
204        let status = resp.status();
205
206        match status {
207            StatusCode::OK => {
208                let body = resp.into_body();
209                let file_info: FileInfo =
210                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
211                Ok(file_info)
212            }
213            _ => Err(parse_error(resp)),
214        }
215    }
216
217    pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
218        let path = build_rooted_abs_path(&self.root, path);
219
220        let req = Request::post(format!(
221            "{}/api/v1/paths/{}/list-status",
222            self.endpoint,
223            percent_encode_path(&path)
224        ));
225
226        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
227
228        let resp = self.info.http_client().send(req).await?;
229
230        let status = resp.status();
231
232        match status {
233            StatusCode::OK => {
234                let body = resp.into_body();
235                let file_infos: Vec<FileInfo> =
236                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
237                Ok(file_infos)
238            }
239            _ => Err(parse_error(resp)),
240        }
241    }
242
243    /// TODO: we should implement range support correctly.
244    ///
245    /// Please refer to [alluxio-py](https://github.com/Alluxio/alluxio-py/blob/main/alluxio/const.py#L18)
246    pub async fn read(&self, stream_id: u64, _: BytesRange) -> Result<Response<HttpBody>> {
247        let req = Request::post(format!(
248            "{}/api/v1/streams/{}/read",
249            self.endpoint, stream_id,
250        ));
251
252        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
253
254        self.info.http_client().fetch(req).await
255    }
256
257    pub(super) async fn write(&self, stream_id: u64, body: Buffer) -> Result<usize> {
258        let req = Request::post(format!(
259            "{}/api/v1/streams/{}/write",
260            self.endpoint, stream_id
261        ));
262        let req = req.body(body).map_err(new_request_build_error)?;
263
264        let resp = self.info.http_client().send(req).await?;
265
266        let status = resp.status();
267
268        match status {
269            StatusCode::OK => {
270                let body = resp.into_body();
271                let size: usize =
272                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
273                Ok(size)
274            }
275            _ => Err(parse_error(resp)),
276        }
277    }
278
279    pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
280        let req = Request::post(format!(
281            "{}/api/v1/streams/{}/close",
282            self.endpoint, stream_id
283        ));
284        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
285
286        let resp = self.info.http_client().send(req).await?;
287
288        let status = resp.status();
289
290        match status {
291            StatusCode::OK => Ok(()),
292            _ => Err(parse_error(resp)),
293        }
294    }
295}
296
297#[derive(Debug, Serialize)]
298struct CreateFileRequest {
299    #[serde(skip_serializing_if = "Option::is_none")]
300    recursive: Option<bool>,
301}
302
303#[derive(Debug, Serialize)]
304#[serde(rename_all = "camelCase")]
305struct CreateDirRequest {
306    #[serde(skip_serializing_if = "Option::is_none")]
307    recursive: Option<bool>,
308    #[serde(skip_serializing_if = "Option::is_none")]
309    allow_exists: Option<bool>,
310}
311
312/// Metadata of alluxio object
313#[derive(Debug, Deserialize)]
314#[serde(rename_all = "camelCase")]
315pub(super) struct FileInfo {
316    /// The path of the object
317    pub path: String,
318    /// The last modification time of the object
319    pub last_modification_time_ms: i64,
320    /// Whether the object is a folder
321    pub folder: bool,
322    /// The length of the object in bytes
323    pub length: u64,
324}
325
326impl TryFrom<FileInfo> for Metadata {
327    type Error = Error;
328
329    fn try_from(file_info: FileInfo) -> Result<Metadata> {
330        let mut metadata = if file_info.folder {
331            Metadata::new(EntryMode::DIR)
332        } else {
333            Metadata::new(EntryMode::FILE)
334        };
335        metadata
336            .set_content_length(file_info.length)
337            .set_last_modified(parse_datetime_from_from_timestamp_millis(
338                file_info.last_modification_time_ms,
339            )?);
340        Ok(metadata)
341    }
342}