opendal/services/dbfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use bytes::Bytes;
23use http::header;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use serde_json::json;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33pub struct DbfsCore {
34    pub root: String,
35    pub endpoint: String,
36    pub token: String,
37    pub client: HttpClient,
38}
39
40impl Debug for DbfsCore {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("DbfsCore")
43            .field("root", &self.root)
44            .field("endpoint", &self.endpoint)
45            .field("token", &self.token)
46            .finish_non_exhaustive()
47    }
48}
49
50impl DbfsCore {
51    pub async fn dbfs_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
52        let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint);
53        let mut req = Request::post(&url);
54
55        let auth_header_content = format!("Bearer {}", self.token);
56        req = req.header(header::AUTHORIZATION, auth_header_content);
57
58        let p = build_rooted_abs_path(&self.root, path)
59            .trim_end_matches('/')
60            .to_string();
61
62        let req_body = &json!({
63            "path": percent_encode_path(&p),
64        });
65        let body = Buffer::from(Bytes::from(req_body.to_string()));
66
67        let req = req.body(body).map_err(new_request_build_error)?;
68
69        self.client.send(req).await
70    }
71
72    pub async fn dbfs_delete(&self, path: &str) -> Result<Response<Buffer>> {
73        let url = format!("{}/api/2.0/dbfs/delete", self.endpoint);
74        let mut req = Request::post(&url);
75
76        let auth_header_content = format!("Bearer {}", self.token);
77        req = req.header(header::AUTHORIZATION, auth_header_content);
78
79        let p = build_rooted_abs_path(&self.root, path)
80            .trim_end_matches('/')
81            .to_string();
82
83        let request_body = &json!({
84            "path": percent_encode_path(&p),
85            // TODO: support recursive toggle, should we add a new field in OpDelete?
86            "recursive": true,
87        });
88
89        let body = Buffer::from(Bytes::from(request_body.to_string()));
90
91        let req = req.body(body).map_err(new_request_build_error)?;
92
93        self.client.send(req).await
94    }
95
96    pub async fn dbfs_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
97        let source = build_rooted_abs_path(&self.root, from);
98        let target = build_rooted_abs_path(&self.root, to);
99
100        let url = format!("{}/api/2.0/dbfs/move", self.endpoint);
101        let mut req = Request::post(&url);
102
103        let auth_header_content = format!("Bearer {}", self.token);
104        req = req.header(header::AUTHORIZATION, auth_header_content);
105
106        let req_body = &json!({
107            "source_path": percent_encode_path(&source),
108            "destination_path": percent_encode_path(&target),
109        });
110
111        let body = Buffer::from(Bytes::from(req_body.to_string()));
112
113        let req = req.body(body).map_err(new_request_build_error)?;
114
115        self.client.send(req).await
116    }
117
118    pub async fn dbfs_list(&self, path: &str) -> Result<Response<Buffer>> {
119        let p = build_rooted_abs_path(&self.root, path)
120            .trim_end_matches('/')
121            .to_string();
122
123        let url = format!(
124            "{}/api/2.0/dbfs/list?path={}",
125            self.endpoint,
126            percent_encode_path(&p)
127        );
128        let mut req = Request::get(&url);
129
130        let auth_header_content = format!("Bearer {}", self.token);
131        req = req.header(header::AUTHORIZATION, auth_header_content);
132
133        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
134
135        self.client.send(req).await
136    }
137
138    pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result<Request<Buffer>> {
139        let url = format!("{}/api/2.0/dbfs/put", self.endpoint);
140
141        let contents = BASE64_STANDARD.encode(body);
142        let mut req = Request::post(&url);
143
144        let auth_header_content = format!("Bearer {}", self.token);
145        req = req.header(header::AUTHORIZATION, auth_header_content);
146
147        let req_body = &json!({
148            "path": path,
149            "contents": contents,
150            "overwrite": true,
151        });
152
153        let body = Buffer::from(Bytes::from(req_body.to_string()));
154
155        req.body(body).map_err(new_request_build_error)
156    }
157
158    pub async fn dbfs_get_status(&self, path: &str) -> Result<Response<Buffer>> {
159        let p = build_rooted_abs_path(&self.root, path)
160            .trim_end_matches('/')
161            .to_string();
162
163        let url = format!(
164            "{}/api/2.0/dbfs/get-status?path={}",
165            &self.endpoint,
166            percent_encode_path(&p)
167        );
168
169        let mut req = Request::get(&url);
170
171        let auth_header_content = format!("Bearer {}", self.token);
172        req = req.header(header::AUTHORIZATION, auth_header_content);
173
174        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
175
176        self.client.send(req).await
177    }
178
179    pub async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> {
180        let resp = self.dbfs_get_status(path).await?;
181
182        match resp.status() {
183            StatusCode::OK => return Ok(()),
184            StatusCode::NOT_FOUND => {
185                self.dbfs_create_dir(path).await?;
186            }
187            _ => return Err(parse_error(resp)),
188        }
189        Ok(())
190    }
191}