opendal/services/alluxio/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Buf;
19use http::Request;
20use http::Response;
21use http::StatusCode;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Formatter;
26use std::sync::Arc;
27
28use super::error::parse_error;
29use crate::raw::*;
30use crate::*;
31
32/// Alluxio core
33#[derive(Clone)]
34pub struct AlluxioCore {
35    pub info: Arc<AccessorInfo>,
36    /// root of this backend.
37    pub root: String,
38    /// endpoint of alluxio
39    pub endpoint: String,
40}
41
42impl Debug for AlluxioCore {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("Backend")
45            .field("root", &self.root)
46            .field("endpoint", &self.endpoint)
47            .finish_non_exhaustive()
48    }
49}
50
51impl AlluxioCore {
52    pub async fn create_dir(&self, path: &str) -> Result<()> {
53        let path = build_rooted_abs_path(&self.root, path);
54
55        let r = CreateDirRequest {
56            recursive: Some(true),
57            allow_exists: Some(true),
58        };
59
60        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
61        let body = bytes::Bytes::from(body);
62
63        let mut req = Request::post(format!(
64            "{}/api/v1/paths/{}/create-directory",
65            self.endpoint,
66            percent_encode_path(&path)
67        ));
68
69        req = req.header("Content-Type", "application/json");
70
71        let req = req.extension(Operation::CreateDir);
72
73        let req = req
74            .body(Buffer::from(body))
75            .map_err(new_request_build_error)?;
76
77        let resp = self.info.http_client().send(req).await?;
78
79        let status = resp.status();
80        match status {
81            StatusCode::OK => Ok(()),
82            _ => Err(parse_error(resp)),
83        }
84    }
85
86    pub async fn create_file(&self, path: &str) -> Result<u64> {
87        let path = build_rooted_abs_path(&self.root, path);
88
89        let r = CreateFileRequest {
90            recursive: Some(true),
91        };
92
93        let body = serde_json::to_vec(&r).map_err(new_json_serialize_error)?;
94        let body = bytes::Bytes::from(body);
95        let mut req = Request::post(format!(
96            "{}/api/v1/paths/{}/create-file",
97            self.endpoint,
98            percent_encode_path(&path)
99        ));
100
101        req = req.header("Content-Type", "application/json");
102
103        let req = req.extension(Operation::Write);
104
105        let req = req
106            .body(Buffer::from(body))
107            .map_err(new_request_build_error)?;
108
109        let resp = self.info.http_client().send(req).await?;
110        let status = resp.status();
111
112        match status {
113            StatusCode::OK => {
114                let body = resp.into_body();
115                let steam_id: u64 =
116                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
117                Ok(steam_id)
118            }
119            _ => Err(parse_error(resp)),
120        }
121    }
122
123    pub(super) async fn open_file(&self, path: &str) -> Result<u64> {
124        let path = build_rooted_abs_path(&self.root, path);
125
126        let req = Request::post(format!(
127            "{}/api/v1/paths/{}/open-file",
128            self.endpoint,
129            percent_encode_path(&path)
130        ));
131
132        let req = req.extension(Operation::Read);
133
134        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
135        let resp = self.info.http_client().send(req).await?;
136
137        let status = resp.status();
138
139        match status {
140            StatusCode::OK => {
141                let body = resp.into_body();
142                let steam_id: u64 =
143                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
144                Ok(steam_id)
145            }
146            _ => Err(parse_error(resp)),
147        }
148    }
149
150    pub(super) async fn delete(&self, path: &str) -> Result<()> {
151        let path = build_rooted_abs_path(&self.root, path);
152
153        let req = Request::post(format!(
154            "{}/api/v1/paths/{}/delete",
155            self.endpoint,
156            percent_encode_path(&path)
157        ));
158
159        let req = req.extension(Operation::Delete);
160
161        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
162        let resp = self.info.http_client().send(req).await?;
163
164        let status = resp.status();
165
166        match status {
167            StatusCode::OK => Ok(()),
168            _ => {
169                let err = parse_error(resp);
170                if err.kind() == ErrorKind::NotFound {
171                    return Ok(());
172                }
173                Err(err)
174            }
175        }
176    }
177
178    pub(super) async fn rename(&self, path: &str, dst: &str) -> Result<()> {
179        let path = build_rooted_abs_path(&self.root, path);
180        let dst = build_rooted_abs_path(&self.root, dst);
181
182        let req = Request::post(format!(
183            "{}/api/v1/paths/{}/rename?dst={}",
184            self.endpoint,
185            percent_encode_path(&path),
186            percent_encode_path(&dst)
187        ));
188
189        let req = req.extension(Operation::Rename);
190
191        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
192
193        let resp = self.info.http_client().send(req).await?;
194
195        let status = resp.status();
196
197        match status {
198            StatusCode::OK => Ok(()),
199            _ => Err(parse_error(resp)),
200        }
201    }
202
203    pub(super) async fn get_status(&self, path: &str) -> Result<FileInfo> {
204        let path = build_rooted_abs_path(&self.root, path);
205
206        let req = Request::post(format!(
207            "{}/api/v1/paths/{}/get-status",
208            self.endpoint,
209            percent_encode_path(&path)
210        ));
211
212        let req = req.extension(Operation::Stat);
213
214        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
215
216        let resp = self.info.http_client().send(req).await?;
217
218        let status = resp.status();
219
220        match status {
221            StatusCode::OK => {
222                let body = resp.into_body();
223                let file_info: FileInfo =
224                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
225                Ok(file_info)
226            }
227            _ => Err(parse_error(resp)),
228        }
229    }
230
231    pub(super) async fn list_status(&self, path: &str) -> Result<Vec<FileInfo>> {
232        let path = build_rooted_abs_path(&self.root, path);
233
234        let req = Request::post(format!(
235            "{}/api/v1/paths/{}/list-status",
236            self.endpoint,
237            percent_encode_path(&path)
238        ));
239
240        let req = req.extension(Operation::List);
241
242        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
243
244        let resp = self.info.http_client().send(req).await?;
245
246        let status = resp.status();
247
248        match status {
249            StatusCode::OK => {
250                let body = resp.into_body();
251                let file_infos: Vec<FileInfo> =
252                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
253                Ok(file_infos)
254            }
255            _ => Err(parse_error(resp)),
256        }
257    }
258
259    /// TODO: we should implement range support correctly.
260    ///
261    /// Please refer to [alluxio-py](https://github.com/Alluxio/alluxio-py/blob/main/alluxio/const.py#L18)
262    pub async fn read(&self, stream_id: u64, _: BytesRange) -> Result<Response<HttpBody>> {
263        let req = Request::post(format!(
264            "{}/api/v1/streams/{}/read",
265            self.endpoint, stream_id,
266        ));
267
268        let req = req.extension(Operation::Read);
269
270        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
271
272        self.info.http_client().fetch(req).await
273    }
274
275    pub(super) async fn write(&self, stream_id: u64, body: Buffer) -> Result<usize> {
276        let req = Request::post(format!(
277            "{}/api/v1/streams/{}/write",
278            self.endpoint, stream_id
279        ));
280
281        let req = req.extension(Operation::Write);
282
283        let req = req.body(body).map_err(new_request_build_error)?;
284
285        let resp = self.info.http_client().send(req).await?;
286
287        let status = resp.status();
288
289        match status {
290            StatusCode::OK => {
291                let body = resp.into_body();
292                let size: usize =
293                    serde_json::from_reader(body.reader()).map_err(new_json_serialize_error)?;
294                Ok(size)
295            }
296            _ => Err(parse_error(resp)),
297        }
298    }
299
300    pub(super) async fn close(&self, stream_id: u64) -> Result<()> {
301        let req = Request::post(format!(
302            "{}/api/v1/streams/{}/close",
303            self.endpoint, stream_id
304        ));
305
306        let req = req.extension(Operation::Write);
307
308        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
309
310        let resp = self.info.http_client().send(req).await?;
311
312        let status = resp.status();
313
314        match status {
315            StatusCode::OK => Ok(()),
316            _ => Err(parse_error(resp)),
317        }
318    }
319}
320
321#[derive(Debug, Serialize)]
322struct CreateFileRequest {
323    #[serde(skip_serializing_if = "Option::is_none")]
324    recursive: Option<bool>,
325}
326
327#[derive(Debug, Serialize)]
328#[serde(rename_all = "camelCase")]
329struct CreateDirRequest {
330    #[serde(skip_serializing_if = "Option::is_none")]
331    recursive: Option<bool>,
332    #[serde(skip_serializing_if = "Option::is_none")]
333    allow_exists: Option<bool>,
334}
335
336/// Metadata of alluxio object
337#[derive(Debug, Deserialize)]
338#[serde(rename_all = "camelCase")]
339pub(super) struct FileInfo {
340    /// The path of the object
341    pub path: String,
342    /// The last modification time of the object
343    pub last_modification_time_ms: i64,
344    /// Whether the object is a folder
345    pub folder: bool,
346    /// The length of the object in bytes
347    pub length: u64,
348}
349
350impl TryFrom<FileInfo> for Metadata {
351    type Error = Error;
352
353    fn try_from(file_info: FileInfo) -> Result<Metadata> {
354        let mut metadata = if file_info.folder {
355            Metadata::new(EntryMode::DIR)
356        } else {
357            Metadata::new(EntryMode::FILE)
358        };
359        metadata
360            .set_content_length(file_info.length)
361            .set_last_modified(parse_datetime_from_from_timestamp_millis(
362                file_info.last_modification_time_ms,
363            )?);
364        Ok(metadata)
365    }
366}