opendal/services/dbfs/
core.rs1use std::fmt::Debug;
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use bytes::Bytes;
23use http::header;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use serde_json::json;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33pub struct DbfsCore {
34 pub root: String,
35 pub endpoint: String,
36 pub token: String,
37 pub client: HttpClient,
38}
39
40impl Debug for DbfsCore {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("DbfsCore")
43 .field("root", &self.root)
44 .field("endpoint", &self.endpoint)
45 .field("token", &self.token)
46 .finish_non_exhaustive()
47 }
48}
49
50impl DbfsCore {
51 pub async fn dbfs_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
52 let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint);
53 let mut req = Request::post(&url);
54
55 let auth_header_content = format!("Bearer {}", self.token);
56 req = req.header(header::AUTHORIZATION, auth_header_content);
57
58 let p = build_rooted_abs_path(&self.root, path)
59 .trim_end_matches('/')
60 .to_string();
61
62 let req_body = &json!({
63 "path": percent_encode_path(&p),
64 });
65 let body = Buffer::from(Bytes::from(req_body.to_string()));
66
67 let req = req.body(body).map_err(new_request_build_error)?;
68
69 self.client.send(req).await
70 }
71
72 pub async fn dbfs_delete(&self, path: &str) -> Result<Response<Buffer>> {
73 let url = format!("{}/api/2.0/dbfs/delete", self.endpoint);
74 let mut req = Request::post(&url);
75
76 let auth_header_content = format!("Bearer {}", self.token);
77 req = req.header(header::AUTHORIZATION, auth_header_content);
78
79 let p = build_rooted_abs_path(&self.root, path)
80 .trim_end_matches('/')
81 .to_string();
82
83 let request_body = &json!({
84 "path": percent_encode_path(&p),
85 "recursive": true,
87 });
88
89 let body = Buffer::from(Bytes::from(request_body.to_string()));
90
91 let req = req.body(body).map_err(new_request_build_error)?;
92
93 self.client.send(req).await
94 }
95
96 pub async fn dbfs_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
97 let source = build_rooted_abs_path(&self.root, from);
98 let target = build_rooted_abs_path(&self.root, to);
99
100 let url = format!("{}/api/2.0/dbfs/move", self.endpoint);
101 let mut req = Request::post(&url);
102
103 let auth_header_content = format!("Bearer {}", self.token);
104 req = req.header(header::AUTHORIZATION, auth_header_content);
105
106 let req_body = &json!({
107 "source_path": percent_encode_path(&source),
108 "destination_path": percent_encode_path(&target),
109 });
110
111 let body = Buffer::from(Bytes::from(req_body.to_string()));
112
113 let req = req.body(body).map_err(new_request_build_error)?;
114
115 self.client.send(req).await
116 }
117
118 pub async fn dbfs_list(&self, path: &str) -> Result<Response<Buffer>> {
119 let p = build_rooted_abs_path(&self.root, path)
120 .trim_end_matches('/')
121 .to_string();
122
123 let url = format!(
124 "{}/api/2.0/dbfs/list?path={}",
125 self.endpoint,
126 percent_encode_path(&p)
127 );
128 let mut req = Request::get(&url);
129
130 let auth_header_content = format!("Bearer {}", self.token);
131 req = req.header(header::AUTHORIZATION, auth_header_content);
132
133 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
134
135 self.client.send(req).await
136 }
137
138 pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result<Request<Buffer>> {
139 let url = format!("{}/api/2.0/dbfs/put", self.endpoint);
140
141 let contents = BASE64_STANDARD.encode(body);
142 let mut req = Request::post(&url);
143
144 let auth_header_content = format!("Bearer {}", self.token);
145 req = req.header(header::AUTHORIZATION, auth_header_content);
146
147 let req_body = &json!({
148 "path": path,
149 "contents": contents,
150 "overwrite": true,
151 });
152
153 let body = Buffer::from(Bytes::from(req_body.to_string()));
154
155 req.body(body).map_err(new_request_build_error)
156 }
157
158 pub async fn dbfs_get_status(&self, path: &str) -> Result<Response<Buffer>> {
159 let p = build_rooted_abs_path(&self.root, path)
160 .trim_end_matches('/')
161 .to_string();
162
163 let url = format!(
164 "{}/api/2.0/dbfs/get-status?path={}",
165 &self.endpoint,
166 percent_encode_path(&p)
167 );
168
169 let mut req = Request::get(&url);
170
171 let auth_header_content = format!("Bearer {}", self.token);
172 req = req.header(header::AUTHORIZATION, auth_header_content);
173
174 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
175
176 self.client.send(req).await
177 }
178
179 pub async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> {
180 let resp = self.dbfs_get_status(path).await?;
181
182 match resp.status() {
183 StatusCode::OK => return Ok(()),
184 StatusCode::NOT_FOUND => {
185 self.dbfs_create_dir(path).await?;
186 }
187 _ => return Err(parse_error(resp)),
188 }
189 Ok(())
190 }
191}