opendal/services/lakefs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::*;
19use crate::*;
20use http::header;
21use http::Request;
22use http::Response;
23use serde::Deserialize;
24use std::collections::HashMap;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28pub struct LakefsCore {
29    pub info: Arc<AccessorInfo>,
30    pub endpoint: String,
31    pub repository: String,
32    pub branch: String,
33    pub root: String,
34    pub username: String,
35    pub password: String,
36}
37
38impl Debug for LakefsCore {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("LakefsCore")
41            .field("endpoint", &self.endpoint)
42            .field("username", &self.username)
43            .field("password", &self.password)
44            .field("root", &self.root)
45            .field("repository", &self.repository)
46            .field("branch", &self.branch)
47            .finish_non_exhaustive()
48    }
49}
50
51impl LakefsCore {
52    pub async fn get_object_metadata(&self, path: &str) -> Result<Response<Buffer>> {
53        let p = build_abs_path(&self.root, path)
54            .trim_end_matches('/')
55            .to_string();
56
57        let url = format!(
58            "{}/api/v1/repositories/{}/refs/{}/objects/stat?path={}",
59            self.endpoint,
60            self.repository,
61            self.branch,
62            percent_encode_path(&p)
63        );
64
65        let mut req = Request::get(&url);
66
67        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
68        req = req.header(header::AUTHORIZATION, auth_header_content);
69        // Inject operation to the request.
70        let req = req.extension(Operation::Read);
71        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
72
73        self.info.http_client().send(req).await
74    }
75
76    pub async fn get_object_content(
77        &self,
78        path: &str,
79        range: BytesRange,
80        _args: &OpRead,
81    ) -> Result<Response<HttpBody>> {
82        let p = build_abs_path(&self.root, path)
83            .trim_end_matches('/')
84            .to_string();
85
86        let url = format!(
87            "{}/api/v1/repositories/{}/refs/{}/objects?path={}",
88            self.endpoint,
89            self.repository,
90            self.branch,
91            percent_encode_path(&p)
92        );
93
94        let mut req = Request::get(&url);
95
96        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
97        req = req.header(header::AUTHORIZATION, auth_header_content);
98
99        if !range.is_full() {
100            req = req.header(header::RANGE, range.to_header());
101        }
102        // Inject operation to the request.
103        let req = req.extension(Operation::Read);
104        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
105
106        self.info.http_client().fetch(req).await
107    }
108
109    pub async fn list_objects(
110        &self,
111        path: &str,
112        delimiter: &str,
113        amount: &Option<usize>,
114        after: Option<String>,
115    ) -> Result<Response<Buffer>> {
116        let p = build_abs_path(&self.root, path);
117
118        let mut url = format!(
119            "{}/api/v1/repositories/{}/refs/{}/objects/ls?",
120            self.endpoint, self.repository, self.branch
121        );
122
123        if !p.is_empty() {
124            url.push_str(&format!("&prefix={}", percent_encode_path(&p)));
125        }
126
127        if !delimiter.is_empty() {
128            url.push_str(&format!("&delimiter={}", delimiter));
129        }
130
131        if let Some(amount) = amount {
132            url.push_str(&format!("&amount={}", amount));
133        }
134
135        if let Some(after) = after {
136            url.push_str(&format!("&after={}", after));
137        }
138
139        let mut req = Request::get(&url);
140
141        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
142        req = req.header(header::AUTHORIZATION, auth_header_content);
143        // Inject operation to the request.
144        let req = req.extension(Operation::Read);
145        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
146
147        self.info.http_client().send(req).await
148    }
149
150    pub async fn upload_object(
151        &self,
152        path: &str,
153        _args: &OpWrite,
154        body: Buffer,
155    ) -> Result<Response<Buffer>> {
156        let p = build_abs_path(&self.root, path)
157            .trim_end_matches('/')
158            .to_string();
159
160        let url = format!(
161            "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
162            self.endpoint,
163            self.repository,
164            self.branch,
165            percent_encode_path(&p)
166        );
167
168        let mut req = Request::post(&url);
169
170        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
171        req = req.header(header::AUTHORIZATION, auth_header_content);
172        // Inject operation to the request.
173        let req = req.extension(Operation::Write);
174        let req = req.body(body).map_err(new_request_build_error)?;
175
176        self.info.http_client().send(req).await
177    }
178
179    pub async fn delete_object(&self, path: &str, _args: &OpDelete) -> Result<Response<Buffer>> {
180        let p = build_abs_path(&self.root, path)
181            .trim_end_matches('/')
182            .to_string();
183
184        let url = format!(
185            "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
186            self.endpoint,
187            self.repository,
188            self.branch,
189            percent_encode_path(&p)
190        );
191
192        let mut req = Request::delete(&url);
193
194        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
195        req = req.header(header::AUTHORIZATION, auth_header_content);
196        // Inject operation to the request.
197        let req = req.extension(Operation::Delete);
198        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
199
200        self.info.http_client().send(req).await
201    }
202
203    pub async fn copy_object(&self, path: &str, dest: &str) -> Result<Response<Buffer>> {
204        let p = build_abs_path(&self.root, path)
205            .trim_end_matches('/')
206            .to_string();
207        let d = build_abs_path(&self.root, dest)
208            .trim_end_matches('/')
209            .to_string();
210
211        let url = format!(
212            "{}/api/v1/repositories/{}/branches/{}/objects/copy?dest_path={}",
213            self.endpoint,
214            self.repository,
215            self.branch,
216            percent_encode_path(&d)
217        );
218
219        let mut req = Request::post(&url);
220
221        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
222        req = req.header(header::AUTHORIZATION, auth_header_content);
223        req = req.header(header::CONTENT_TYPE, "application/json");
224        let mut map = HashMap::new();
225        map.insert("src_path", p);
226
227        let req = req
228            // Inject operation to the request.
229            .extension(Operation::Delete)
230            .body(serde_json::to_vec(&map).unwrap().into())
231            .map_err(new_request_build_error)?;
232        self.info.http_client().send(req).await
233    }
234}
235
236#[derive(Deserialize, Eq, PartialEq, Debug)]
237pub(super) struct LakefsStatus {
238    pub path: String,
239    pub path_type: String,
240    pub physical_address: String,
241    pub checksum: String,
242    pub size_bytes: Option<u64>,
243    pub mtime: i64,
244    pub content_type: Option<String>,
245}
246
247#[derive(Deserialize, Eq, PartialEq, Debug)]
248pub(super) struct LakefsListResponse {
249    pub pagination: Pagination,
250    pub results: Vec<LakefsStatus>,
251}
252
253#[derive(Deserialize, Eq, PartialEq, Debug)]
254pub(super) struct Pagination {
255    pub has_more: bool,
256    pub max_per_page: u64,
257    pub next_offset: String,
258    pub results: u64,
259}