1use crate::raw::*;
19use crate::*;
20use http::header;
21use http::Request;
22use http::Response;
23use serde::Deserialize;
24use std::collections::HashMap;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28pub struct LakefsCore {
29 pub info: Arc<AccessorInfo>,
30 pub endpoint: String,
31 pub repository: String,
32 pub branch: String,
33 pub root: String,
34 pub username: String,
35 pub password: String,
36}
37
38impl Debug for LakefsCore {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("LakefsCore")
41 .field("endpoint", &self.endpoint)
42 .field("username", &self.username)
43 .field("password", &self.password)
44 .field("root", &self.root)
45 .field("repository", &self.repository)
46 .field("branch", &self.branch)
47 .finish_non_exhaustive()
48 }
49}
50
51impl LakefsCore {
52 pub async fn get_object_metadata(&self, path: &str) -> Result<Response<Buffer>> {
53 let p = build_abs_path(&self.root, path)
54 .trim_end_matches('/')
55 .to_string();
56
57 let url = format!(
58 "{}/api/v1/repositories/{}/refs/{}/objects/stat?path={}",
59 self.endpoint,
60 self.repository,
61 self.branch,
62 percent_encode_path(&p)
63 );
64
65 let mut req = Request::get(&url);
66
67 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
68 req = req.header(header::AUTHORIZATION, auth_header_content);
69 let req = req.extension(Operation::Read);
71 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
72
73 self.info.http_client().send(req).await
74 }
75
76 pub async fn get_object_content(
77 &self,
78 path: &str,
79 range: BytesRange,
80 _args: &OpRead,
81 ) -> Result<Response<HttpBody>> {
82 let p = build_abs_path(&self.root, path)
83 .trim_end_matches('/')
84 .to_string();
85
86 let url = format!(
87 "{}/api/v1/repositories/{}/refs/{}/objects?path={}",
88 self.endpoint,
89 self.repository,
90 self.branch,
91 percent_encode_path(&p)
92 );
93
94 let mut req = Request::get(&url);
95
96 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
97 req = req.header(header::AUTHORIZATION, auth_header_content);
98
99 if !range.is_full() {
100 req = req.header(header::RANGE, range.to_header());
101 }
102 let req = req.extension(Operation::Read);
104 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
105
106 self.info.http_client().fetch(req).await
107 }
108
109 pub async fn list_objects(
110 &self,
111 path: &str,
112 delimiter: &str,
113 amount: &Option<usize>,
114 after: Option<String>,
115 ) -> Result<Response<Buffer>> {
116 let p = build_abs_path(&self.root, path);
117
118 let mut url = format!(
119 "{}/api/v1/repositories/{}/refs/{}/objects/ls?",
120 self.endpoint, self.repository, self.branch
121 );
122
123 if !p.is_empty() {
124 url.push_str(&format!("&prefix={}", percent_encode_path(&p)));
125 }
126
127 if !delimiter.is_empty() {
128 url.push_str(&format!("&delimiter={}", delimiter));
129 }
130
131 if let Some(amount) = amount {
132 url.push_str(&format!("&amount={}", amount));
133 }
134
135 if let Some(after) = after {
136 url.push_str(&format!("&after={}", after));
137 }
138
139 let mut req = Request::get(&url);
140
141 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
142 req = req.header(header::AUTHORIZATION, auth_header_content);
143 let req = req.extension(Operation::Read);
145 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
146
147 self.info.http_client().send(req).await
148 }
149
150 pub async fn upload_object(
151 &self,
152 path: &str,
153 _args: &OpWrite,
154 body: Buffer,
155 ) -> Result<Response<Buffer>> {
156 let p = build_abs_path(&self.root, path)
157 .trim_end_matches('/')
158 .to_string();
159
160 let url = format!(
161 "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
162 self.endpoint,
163 self.repository,
164 self.branch,
165 percent_encode_path(&p)
166 );
167
168 let mut req = Request::post(&url);
169
170 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
171 req = req.header(header::AUTHORIZATION, auth_header_content);
172 let req = req.extension(Operation::Write);
174 let req = req.body(body).map_err(new_request_build_error)?;
175
176 self.info.http_client().send(req).await
177 }
178
179 pub async fn delete_object(&self, path: &str, _args: &OpDelete) -> Result<Response<Buffer>> {
180 let p = build_abs_path(&self.root, path)
181 .trim_end_matches('/')
182 .to_string();
183
184 let url = format!(
185 "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
186 self.endpoint,
187 self.repository,
188 self.branch,
189 percent_encode_path(&p)
190 );
191
192 let mut req = Request::delete(&url);
193
194 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
195 req = req.header(header::AUTHORIZATION, auth_header_content);
196 let req = req.extension(Operation::Delete);
198 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
199
200 self.info.http_client().send(req).await
201 }
202
203 pub async fn copy_object(&self, path: &str, dest: &str) -> Result<Response<Buffer>> {
204 let p = build_abs_path(&self.root, path)
205 .trim_end_matches('/')
206 .to_string();
207 let d = build_abs_path(&self.root, dest)
208 .trim_end_matches('/')
209 .to_string();
210
211 let url = format!(
212 "{}/api/v1/repositories/{}/branches/{}/objects/copy?dest_path={}",
213 self.endpoint,
214 self.repository,
215 self.branch,
216 percent_encode_path(&d)
217 );
218
219 let mut req = Request::post(&url);
220
221 let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
222 req = req.header(header::AUTHORIZATION, auth_header_content);
223 req = req.header(header::CONTENT_TYPE, "application/json");
224 let mut map = HashMap::new();
225 map.insert("src_path", p);
226
227 let req = req
228 .extension(Operation::Delete)
230 .body(serde_json::to_vec(&map).unwrap().into())
231 .map_err(new_request_build_error)?;
232 self.info.http_client().send(req).await
233 }
234}
235
236#[derive(Deserialize, Eq, PartialEq, Debug)]
237pub(super) struct LakefsStatus {
238 pub path: String,
239 pub path_type: String,
240 pub physical_address: String,
241 pub checksum: String,
242 pub size_bytes: Option<u64>,
243 pub mtime: i64,
244 pub content_type: Option<String>,
245}
246
247#[derive(Deserialize, Eq, PartialEq, Debug)]
248pub(super) struct LakefsListResponse {
249 pub pagination: Pagination,
250 pub results: Vec<LakefsStatus>,
251}
252
253#[derive(Deserialize, Eq, PartialEq, Debug)]
254pub(super) struct Pagination {
255 pub has_more: bool,
256 pub max_per_page: u64,
257 pub next_offset: String,
258 pub results: u64,
259}