1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use http::header;
23use http::Request;
24use http::Response;
25use serde::Deserialize;
26
27use crate::raw::*;
28use crate::*;
29
30pub struct LakefsCore {
31 pub info: Arc<AccessorInfo>,
32 pub endpoint: String,
33 pub repository: String,
34 pub branch: String,
35 pub root: String,
36 pub username: String,
37 pub password: String,
38}
39
40impl Debug for LakefsCore {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("LakefsCore")
43 .field("endpoint", &self.endpoint)
44 .field("username", &self.username)
45 .field("password", &self.password)
46 .field("root", &self.root)
47 .field("repository", &self.repository)
48 .field("branch", &self.branch)
49 .finish_non_exhaustive()
50 }
51}
52
53impl LakefsCore {
54 pub async fn get_object_metadata(&self, path: &str) -> Result<Response<Buffer>> {
55 let p = build_abs_path(&self.root, path)
56 .trim_end_matches('/')
57 .to_string();
58
59 let url = format!(
60 "{}/api/v1/repositories/{}/refs/{}/objects/stat?path={}",
61 self.endpoint,
62 self.repository,
63 self.branch,
64 percent_encode_path(&p)
65 );
66
67 let mut req = Request::get(&url);
68
69 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
70 req = req.header(header::AUTHORIZATION, auth_header_content);
71 let req = req.extension(Operation::Read);
73 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
74
75 self.info.http_client().send(req).await
76 }
77
78 pub async fn get_object_content(
79 &self,
80 path: &str,
81 range: BytesRange,
82 _args: &OpRead,
83 ) -> Result<Response<HttpBody>> {
84 let p = build_abs_path(&self.root, path)
85 .trim_end_matches('/')
86 .to_string();
87
88 let url = format!(
89 "{}/api/v1/repositories/{}/refs/{}/objects?path={}",
90 self.endpoint,
91 self.repository,
92 self.branch,
93 percent_encode_path(&p)
94 );
95
96 let mut req = Request::get(&url);
97
98 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
99 req = req.header(header::AUTHORIZATION, auth_header_content);
100
101 if !range.is_full() {
102 req = req.header(header::RANGE, range.to_header());
103 }
104 let req = req.extension(Operation::Read);
106 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
107
108 self.info.http_client().fetch(req).await
109 }
110
111 pub async fn list_objects(
112 &self,
113 path: &str,
114 delimiter: &str,
115 amount: &Option<usize>,
116 after: Option<String>,
117 ) -> Result<Response<Buffer>> {
118 let p = build_abs_path(&self.root, path);
119
120 let mut url = format!(
121 "{}/api/v1/repositories/{}/refs/{}/objects/ls?",
122 self.endpoint, self.repository, self.branch
123 );
124
125 if !p.is_empty() {
126 url.push_str(&format!("&prefix={}", percent_encode_path(&p)));
127 }
128
129 if !delimiter.is_empty() {
130 url.push_str(&format!("&delimiter={}", delimiter));
131 }
132
133 if let Some(amount) = amount {
134 url.push_str(&format!("&amount={}", amount));
135 }
136
137 if let Some(after) = after {
138 url.push_str(&format!("&after={}", after));
139 }
140
141 let mut req = Request::get(&url);
142
143 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
144 req = req.header(header::AUTHORIZATION, auth_header_content);
145 let req = req.extension(Operation::Read);
147 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
148
149 self.info.http_client().send(req).await
150 }
151
152 pub async fn upload_object(
153 &self,
154 path: &str,
155 _args: &OpWrite,
156 body: Buffer,
157 ) -> Result<Response<Buffer>> {
158 let p = build_abs_path(&self.root, path)
159 .trim_end_matches('/')
160 .to_string();
161
162 let url = format!(
163 "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
164 self.endpoint,
165 self.repository,
166 self.branch,
167 percent_encode_path(&p)
168 );
169
170 let mut req = Request::post(&url);
171
172 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
173 req = req.header(header::AUTHORIZATION, auth_header_content);
174 let req = req.extension(Operation::Write);
176 let req = req.body(body).map_err(new_request_build_error)?;
177
178 self.info.http_client().send(req).await
179 }
180
181 pub async fn delete_object(&self, path: &str, _args: &OpDelete) -> Result<Response<Buffer>> {
182 let p = build_abs_path(&self.root, path)
183 .trim_end_matches('/')
184 .to_string();
185
186 let url = format!(
187 "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
188 self.endpoint,
189 self.repository,
190 self.branch,
191 percent_encode_path(&p)
192 );
193
194 let mut req = Request::delete(&url);
195
196 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
197 req = req.header(header::AUTHORIZATION, auth_header_content);
198 let req = req.extension(Operation::Delete);
200 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
201
202 self.info.http_client().send(req).await
203 }
204
205 pub async fn copy_object(&self, path: &str, dest: &str) -> Result<Response<Buffer>> {
206 let p = build_abs_path(&self.root, path)
207 .trim_end_matches('/')
208 .to_string();
209 let d = build_abs_path(&self.root, dest)
210 .trim_end_matches('/')
211 .to_string();
212
213 let url = format!(
214 "{}/api/v1/repositories/{}/branches/{}/objects/copy?dest_path={}",
215 self.endpoint,
216 self.repository,
217 self.branch,
218 percent_encode_path(&d)
219 );
220
221 let mut req = Request::post(&url);
222
223 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
224 req = req.header(header::AUTHORIZATION, auth_header_content);
225 req = req.header(header::CONTENT_TYPE, "application/json");
226 let mut map = HashMap::new();
227 map.insert("src_path", p);
228
229 let req = req
230 .extension(Operation::Delete)
232 .body(serde_json::to_vec(&map).unwrap().into())
233 .map_err(new_request_build_error)?;
234 self.info.http_client().send(req).await
235 }
236}
237
238#[derive(Deserialize, Eq, PartialEq, Debug)]
239pub(super) struct LakefsStatus {
240 pub path: String,
241 pub path_type: String,
242 pub physical_address: String,
243 pub checksum: String,
244 pub size_bytes: Option<u64>,
245 pub mtime: i64,
246 pub content_type: Option<String>,
247}
248
249#[derive(Deserialize, Eq, PartialEq, Debug)]
250pub(super) struct LakefsListResponse {
251 pub pagination: Pagination,
252 pub results: Vec<LakefsStatus>,
253}
254
255#[derive(Deserialize, Eq, PartialEq, Debug)]
256pub(super) struct Pagination {
257 pub has_more: bool,
258 pub max_per_page: u64,
259 pub next_offset: String,
260 pub results: u64,
261}