opendal/services/lakefs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use http::header;
23use http::Request;
24use http::Response;
25use serde::Deserialize;
26
27use crate::raw::*;
28use crate::*;
29
30pub struct LakefsCore {
31    pub info: Arc<AccessorInfo>,
32    pub endpoint: String,
33    pub repository: String,
34    pub branch: String,
35    pub root: String,
36    pub username: String,
37    pub password: String,
38}
39
40impl Debug for LakefsCore {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("LakefsCore")
43            .field("endpoint", &self.endpoint)
44            .field("username", &self.username)
45            .field("password", &self.password)
46            .field("root", &self.root)
47            .field("repository", &self.repository)
48            .field("branch", &self.branch)
49            .finish_non_exhaustive()
50    }
51}
52
53impl LakefsCore {
54    pub async fn get_object_metadata(&self, path: &str) -> Result<Response<Buffer>> {
55        let p = build_abs_path(&self.root, path)
56            .trim_end_matches('/')
57            .to_string();
58
59        let url = format!(
60            "{}/api/v1/repositories/{}/refs/{}/objects/stat?path={}",
61            self.endpoint,
62            self.repository,
63            self.branch,
64            percent_encode_path(&p)
65        );
66
67        let mut req = Request::get(&url);
68
69        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
70        req = req.header(header::AUTHORIZATION, auth_header_content);
71        // Inject operation to the request.
72        let req = req.extension(Operation::Read);
73        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
74
75        self.info.http_client().send(req).await
76    }
77
78    pub async fn get_object_content(
79        &self,
80        path: &str,
81        range: BytesRange,
82        _args: &OpRead,
83    ) -> Result<Response<HttpBody>> {
84        let p = build_abs_path(&self.root, path)
85            .trim_end_matches('/')
86            .to_string();
87
88        let url = format!(
89            "{}/api/v1/repositories/{}/refs/{}/objects?path={}",
90            self.endpoint,
91            self.repository,
92            self.branch,
93            percent_encode_path(&p)
94        );
95
96        let mut req = Request::get(&url);
97
98        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
99        req = req.header(header::AUTHORIZATION, auth_header_content);
100
101        if !range.is_full() {
102            req = req.header(header::RANGE, range.to_header());
103        }
104        // Inject operation to the request.
105        let req = req.extension(Operation::Read);
106        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
107
108        self.info.http_client().fetch(req).await
109    }
110
111    pub async fn list_objects(
112        &self,
113        path: &str,
114        delimiter: &str,
115        amount: &Option<usize>,
116        after: Option<String>,
117    ) -> Result<Response<Buffer>> {
118        let p = build_abs_path(&self.root, path);
119
120        let mut url = format!(
121            "{}/api/v1/repositories/{}/refs/{}/objects/ls?",
122            self.endpoint, self.repository, self.branch
123        );
124
125        if !p.is_empty() {
126            url.push_str(&format!("&prefix={}", percent_encode_path(&p)));
127        }
128
129        if !delimiter.is_empty() {
130            url.push_str(&format!("&delimiter={}", delimiter));
131        }
132
133        if let Some(amount) = amount {
134            url.push_str(&format!("&amount={}", amount));
135        }
136
137        if let Some(after) = after {
138            url.push_str(&format!("&after={}", after));
139        }
140
141        let mut req = Request::get(&url);
142
143        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
144        req = req.header(header::AUTHORIZATION, auth_header_content);
145        // Inject operation to the request.
146        let req = req.extension(Operation::Read);
147        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
148
149        self.info.http_client().send(req).await
150    }
151
152    pub async fn upload_object(
153        &self,
154        path: &str,
155        _args: &OpWrite,
156        body: Buffer,
157    ) -> Result<Response<Buffer>> {
158        let p = build_abs_path(&self.root, path)
159            .trim_end_matches('/')
160            .to_string();
161
162        let url = format!(
163            "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
164            self.endpoint,
165            self.repository,
166            self.branch,
167            percent_encode_path(&p)
168        );
169
170        let mut req = Request::post(&url);
171
172        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
173        req = req.header(header::AUTHORIZATION, auth_header_content);
174        // Inject operation to the request.
175        let req = req.extension(Operation::Write);
176        let req = req.body(body).map_err(new_request_build_error)?;
177
178        self.info.http_client().send(req).await
179    }
180
181    pub async fn delete_object(&self, path: &str, _args: &OpDelete) -> Result<Response<Buffer>> {
182        let p = build_abs_path(&self.root, path)
183            .trim_end_matches('/')
184            .to_string();
185
186        let url = format!(
187            "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
188            self.endpoint,
189            self.repository,
190            self.branch,
191            percent_encode_path(&p)
192        );
193
194        let mut req = Request::delete(&url);
195
196        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
197        req = req.header(header::AUTHORIZATION, auth_header_content);
198        // Inject operation to the request.
199        let req = req.extension(Operation::Delete);
200        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
201
202        self.info.http_client().send(req).await
203    }
204
205    pub async fn copy_object(&self, path: &str, dest: &str) -> Result<Response<Buffer>> {
206        let p = build_abs_path(&self.root, path)
207            .trim_end_matches('/')
208            .to_string();
209        let d = build_abs_path(&self.root, dest)
210            .trim_end_matches('/')
211            .to_string();
212
213        let url = format!(
214            "{}/api/v1/repositories/{}/branches/{}/objects/copy?dest_path={}",
215            self.endpoint,
216            self.repository,
217            self.branch,
218            percent_encode_path(&d)
219        );
220
221        let mut req = Request::post(&url);
222
223        let auth_header_content = format_authorization_by_basic(&self.username, &self.password)?;
224        req = req.header(header::AUTHORIZATION, auth_header_content);
225        req = req.header(header::CONTENT_TYPE, "application/json");
226        let mut map = HashMap::new();
227        map.insert("src_path", p);
228
229        let req = req
230            // Inject operation to the request.
231            .extension(Operation::Delete)
232            .body(serde_json::to_vec(&map).unwrap().into())
233            .map_err(new_request_build_error)?;
234        self.info.http_client().send(req).await
235    }
236}
237
238#[derive(Deserialize, Eq, PartialEq, Debug)]
239pub(super) struct LakefsStatus {
240    pub path: String,
241    pub path_type: String,
242    pub physical_address: String,
243    pub checksum: String,
244    pub size_bytes: Option<u64>,
245    pub mtime: i64,
246    pub content_type: Option<String>,
247}
248
249#[derive(Deserialize, Eq, PartialEq, Debug)]
250pub(super) struct LakefsListResponse {
251    pub pagination: Pagination,
252    pub results: Vec<LakefsStatus>,
253}
254
255#[derive(Deserialize, Eq, PartialEq, Debug)]
256pub(super) struct Pagination {
257    pub has_more: bool,
258    pub max_per_page: u64,
259    pub next_offset: String,
260    pub results: u64,
261}