opendal/services/lakefs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use chrono::TimeZone;
24use chrono::Utc;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28
29use super::core::LakefsCore;
30use super::core::LakefsStatus;
31use super::delete::LakefsDeleter;
32use super::error::parse_error;
33use super::lister::LakefsLister;
34use super::writer::LakefsWriter;
35use super::DEFAULT_SCHEME;
36use crate::raw::*;
37use crate::services::LakefsConfig;
38use crate::*;
39impl Configurator for LakefsConfig {
40    type Builder = LakefsBuilder;
41    fn into_builder(self) -> Self::Builder {
42        LakefsBuilder { config: self }
43    }
44}
45
46/// [Lakefs](https://docs.lakefs.io/reference/api.html#/)'s API support.
47#[doc = include_str!("docs.md")]
48#[derive(Default, Clone)]
49pub struct LakefsBuilder {
50    config: LakefsConfig,
51}
52
53impl Debug for LakefsBuilder {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        let mut ds = f.debug_struct("Builder");
56
57        ds.field("config", &self.config);
58        ds.finish()
59    }
60}
61
62impl LakefsBuilder {
63    /// Set the endpoint of this backend.
64    ///
65    /// endpoint must be full uri.
66    ///
67    /// This is required.
68    /// - `http://127.0.0.1:8000` (lakefs daemon in local)
69    /// - `https://my-lakefs.example.com` (lakefs server)
70    pub fn endpoint(mut self, endpoint: &str) -> Self {
71        if !endpoint.is_empty() {
72            self.config.endpoint = Some(endpoint.to_string());
73        }
74        self
75    }
76
77    /// Set username of this backend. This is required.
78    pub fn username(mut self, username: &str) -> Self {
79        if !username.is_empty() {
80            self.config.username = Some(username.to_string());
81        }
82        self
83    }
84
85    /// Set password of this backend. This is required.
86    pub fn password(mut self, password: &str) -> Self {
87        if !password.is_empty() {
88            self.config.password = Some(password.to_string());
89        }
90        self
91    }
92
93    /// Set branch of this backend or a commit ID. Default is main.
94    ///
95    /// Branch can be a branch name.
96    ///
97    /// For example, branch can be:
98    /// - main
99    /// - 1d0c4eb
100    pub fn branch(mut self, branch: &str) -> Self {
101        if !branch.is_empty() {
102            self.config.branch = Some(branch.to_string());
103        }
104        self
105    }
106
107    /// Set root of this backend.
108    ///
109    /// All operations will happen under this root.
110    pub fn root(mut self, root: &str) -> Self {
111        if !root.is_empty() {
112            self.config.root = Some(root.to_string());
113        }
114        self
115    }
116
117    /// Set the repository of this backend.
118    ///
119    /// This is required.
120    pub fn repository(mut self, repository: &str) -> Self {
121        if !repository.is_empty() {
122            self.config.repository = Some(repository.to_string());
123        }
124        self
125    }
126}
127
128impl Builder for LakefsBuilder {
129    type Config = LakefsConfig;
130
131    /// Build a LakefsBackend.
132    fn build(self) -> Result<impl Access> {
133        debug!("backend build started: {:?}", &self);
134
135        let endpoint = match self.config.endpoint {
136            Some(endpoint) => Ok(endpoint.clone()),
137            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
138                .with_operation("Builder::build")
139                .with_context("service", Scheme::Lakefs)),
140        }?;
141        debug!("backend use endpoint: {:?}", &endpoint);
142
143        let repository = match &self.config.repository {
144            Some(repository) => Ok(repository.clone()),
145            None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
146                .with_operation("Builder::build")
147                .with_context("service", Scheme::Lakefs)),
148        }?;
149        debug!("backend use repository: {}", &repository);
150
151        let branch = match &self.config.branch {
152            Some(branch) => branch.clone(),
153            None => "main".to_string(),
154        };
155        debug!("backend use branch: {}", &branch);
156
157        let root = normalize_root(&self.config.root.unwrap_or_default());
158        debug!("backend use root: {}", &root);
159
160        let username = match &self.config.username {
161            Some(username) => Ok(username.clone()),
162            None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
163                .with_operation("Builder::build")
164                .with_context("service", Scheme::Lakefs)),
165        }?;
166
167        let password = match &self.config.password {
168            Some(password) => Ok(password.clone()),
169            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
170                .with_operation("Builder::build")
171                .with_context("service", Scheme::Lakefs)),
172        }?;
173
174        Ok(LakefsBackend {
175            core: Arc::new(LakefsCore {
176                info: {
177                    let am = AccessorInfo::default();
178                    am.set_scheme(DEFAULT_SCHEME)
179                        .set_native_capability(Capability {
180                            stat: true,
181
182                            list: true,
183
184                            read: true,
185                            write: true,
186                            delete: true,
187                            copy: true,
188                            shared: true,
189                            ..Default::default()
190                        });
191                    am.into()
192                },
193                endpoint,
194                repository,
195                branch,
196                root,
197                username,
198                password,
199            }),
200        })
201    }
202}
203
204/// Backend for Lakefs service
205#[derive(Debug, Clone)]
206pub struct LakefsBackend {
207    core: Arc<LakefsCore>,
208}
209
210impl Access for LakefsBackend {
211    type Reader = HttpBody;
212    type Writer = oio::OneShotWriter<LakefsWriter>;
213    type Lister = oio::PageLister<LakefsLister>;
214    type Deleter = oio::OneShotDeleter<LakefsDeleter>;
215
216    fn info(&self) -> Arc<AccessorInfo> {
217        self.core.info.clone()
218    }
219
220    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
221        // Stat root always returns a DIR.
222        if path == "/" {
223            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
224        }
225
226        let resp = self.core.get_object_metadata(path).await?;
227
228        let status = resp.status();
229
230        match status {
231            StatusCode::OK => {
232                let mut meta = parse_into_metadata(path, resp.headers())?;
233                let bs = resp.clone().into_body();
234
235                let decoded_response: LakefsStatus =
236                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
237                if let Some(size_bytes) = decoded_response.size_bytes {
238                    meta.set_content_length(size_bytes);
239                }
240                meta.set_mode(EntryMode::FILE);
241                if let Some(v) = parse_content_disposition(resp.headers())? {
242                    meta.set_content_disposition(v);
243                }
244
245                meta.set_last_modified(Utc.timestamp_opt(decoded_response.mtime, 0).unwrap());
246
247                Ok(RpStat::new(meta))
248            }
249            _ => Err(parse_error(resp)),
250        }
251    }
252
253    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
254        let resp = self
255            .core
256            .get_object_content(path, args.range(), &args)
257            .await?;
258
259        let status = resp.status();
260
261        match status {
262            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
263                Ok((RpRead::default(), resp.into_body()))
264            }
265            _ => {
266                let (part, mut body) = resp.into_parts();
267                let buf = body.to_buffer().await?;
268                Err(parse_error(Response::from_parts(part, buf)))
269            }
270        }
271    }
272
273    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
274        let l = LakefsLister::new(
275            self.core.clone(),
276            path.to_string(),
277            args.limit(),
278            args.start_after(),
279            args.recursive(),
280        );
281
282        Ok((RpList::default(), oio::PageLister::new(l)))
283    }
284
285    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
286        Ok((
287            RpWrite::default(),
288            oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
289        ))
290    }
291
292    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
293        Ok((
294            RpDelete::default(),
295            oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
296        ))
297    }
298
299    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
300        let resp = self.core.copy_object(from, to).await?;
301
302        let status = resp.status();
303
304        match status {
305            StatusCode::CREATED => Ok(RpCopy::default()),
306            _ => Err(parse_error(resp)),
307        }
308    }
309}