opendal/services/lakefs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use chrono::TimeZone;
24use chrono::Utc;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28
29use super::core::LakefsCore;
30use super::core::LakefsStatus;
31use super::delete::LakefsDeleter;
32use super::error::parse_error;
33use super::lister::LakefsLister;
34use super::writer::LakefsWriter;
35use crate::raw::*;
36use crate::services::LakefsConfig;
37use crate::*;
38
39impl Configurator for LakefsConfig {
40    type Builder = LakefsBuilder;
41    fn into_builder(self) -> Self::Builder {
42        LakefsBuilder { config: self }
43    }
44}
45
46/// [Lakefs](https://docs.lakefs.io/reference/api.html#/)'s API support.
47#[doc = include_str!("docs.md")]
48#[derive(Default, Clone)]
49pub struct LakefsBuilder {
50    config: LakefsConfig,
51}
52
53impl Debug for LakefsBuilder {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        let mut ds = f.debug_struct("Builder");
56
57        ds.field("config", &self.config);
58        ds.finish()
59    }
60}
61
62impl LakefsBuilder {
63    /// Set the endpoint of this backend.
64    ///
65    /// endpoint must be full uri.
66    ///
67    /// This is required.
68    /// - `http://127.0.0.1:8000` (lakefs daemon in local)
69    /// - `https://my-lakefs.example.com` (lakefs server)
70    pub fn endpoint(mut self, endpoint: &str) -> Self {
71        if !endpoint.is_empty() {
72            self.config.endpoint = Some(endpoint.to_string());
73        }
74        self
75    }
76
77    /// Set username of this backend. This is required.
78    pub fn username(mut self, username: &str) -> Self {
79        if !username.is_empty() {
80            self.config.username = Some(username.to_string());
81        }
82        self
83    }
84
85    /// Set password of this backend. This is required.
86    pub fn password(mut self, password: &str) -> Self {
87        if !password.is_empty() {
88            self.config.password = Some(password.to_string());
89        }
90        self
91    }
92
93    /// Set branch of this backend or a commit ID. Default is main.
94    ///
95    /// Branch can be a branch name.
96    ///
97    /// For example, branch can be:
98    /// - main
99    /// - 1d0c4eb
100    pub fn branch(mut self, branch: &str) -> Self {
101        if !branch.is_empty() {
102            self.config.branch = Some(branch.to_string());
103        }
104        self
105    }
106
107    /// Set root of this backend.
108    ///
109    /// All operations will happen under this root.
110    pub fn root(mut self, root: &str) -> Self {
111        if !root.is_empty() {
112            self.config.root = Some(root.to_string());
113        }
114        self
115    }
116
117    /// Set the repository of this backend.
118    ///
119    /// This is required.
120    pub fn repository(mut self, repository: &str) -> Self {
121        if !repository.is_empty() {
122            self.config.repository = Some(repository.to_string());
123        }
124        self
125    }
126}
127
128impl Builder for LakefsBuilder {
129    const SCHEME: Scheme = Scheme::Lakefs;
130    type Config = LakefsConfig;
131
132    /// Build a LakefsBackend.
133    fn build(self) -> Result<impl Access> {
134        debug!("backend build started: {:?}", &self);
135
136        let endpoint = match self.config.endpoint {
137            Some(endpoint) => Ok(endpoint.clone()),
138            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
139                .with_operation("Builder::build")
140                .with_context("service", Scheme::Lakefs)),
141        }?;
142        debug!("backend use endpoint: {:?}", &endpoint);
143
144        let repository = match &self.config.repository {
145            Some(repository) => Ok(repository.clone()),
146            None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
147                .with_operation("Builder::build")
148                .with_context("service", Scheme::Lakefs)),
149        }?;
150        debug!("backend use repository: {}", &repository);
151
152        let branch = match &self.config.branch {
153            Some(branch) => branch.clone(),
154            None => "main".to_string(),
155        };
156        debug!("backend use branch: {}", &branch);
157
158        let root = normalize_root(&self.config.root.unwrap_or_default());
159        debug!("backend use root: {}", &root);
160
161        let username = match &self.config.username {
162            Some(username) => Ok(username.clone()),
163            None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
164                .with_operation("Builder::build")
165                .with_context("service", Scheme::Lakefs)),
166        }?;
167
168        let password = match &self.config.password {
169            Some(password) => Ok(password.clone()),
170            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
171                .with_operation("Builder::build")
172                .with_context("service", Scheme::Lakefs)),
173        }?;
174
175        Ok(LakefsBackend {
176            core: Arc::new(LakefsCore {
177                info: {
178                    let am = AccessorInfo::default();
179                    am.set_scheme(Scheme::Lakefs)
180                        .set_native_capability(Capability {
181                            stat: true,
182                            stat_has_content_length: true,
183                            stat_has_content_disposition: true,
184                            stat_has_last_modified: true,
185
186                            list: true,
187                            list_has_content_length: true,
188                            list_has_last_modified: true,
189
190                            read: true,
191                            write: true,
192                            delete: true,
193                            copy: true,
194                            shared: true,
195                            ..Default::default()
196                        });
197                    am.into()
198                },
199                endpoint,
200                repository,
201                branch,
202                root,
203                username,
204                password,
205            }),
206        })
207    }
208}
209
210/// Backend for Lakefs service
211#[derive(Debug, Clone)]
212pub struct LakefsBackend {
213    core: Arc<LakefsCore>,
214}
215
216impl Access for LakefsBackend {
217    type Reader = HttpBody;
218    type Writer = oio::OneShotWriter<LakefsWriter>;
219    type Lister = oio::PageLister<LakefsLister>;
220    type Deleter = oio::OneShotDeleter<LakefsDeleter>;
221    type BlockingReader = ();
222    type BlockingWriter = ();
223    type BlockingLister = ();
224    type BlockingDeleter = ();
225
226    fn info(&self) -> Arc<AccessorInfo> {
227        self.core.info.clone()
228    }
229
230    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
231        // Stat root always returns a DIR.
232        if path == "/" {
233            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
234        }
235
236        let resp = self.core.get_object_metadata(path).await?;
237
238        let status = resp.status();
239
240        match status {
241            StatusCode::OK => {
242                let mut meta = parse_into_metadata(path, resp.headers())?;
243                let bs = resp.clone().into_body();
244
245                let decoded_response: LakefsStatus =
246                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
247                if let Some(size_bytes) = decoded_response.size_bytes {
248                    meta.set_content_length(size_bytes);
249                }
250                meta.set_mode(EntryMode::FILE);
251                if let Some(v) = parse_content_disposition(resp.headers())? {
252                    meta.set_content_disposition(v);
253                }
254
255                meta.set_last_modified(Utc.timestamp_opt(decoded_response.mtime, 0).unwrap());
256
257                Ok(RpStat::new(meta))
258            }
259            _ => Err(parse_error(resp)),
260        }
261    }
262
263    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
264        let resp = self
265            .core
266            .get_object_content(path, args.range(), &args)
267            .await?;
268
269        let status = resp.status();
270
271        match status {
272            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
273                Ok((RpRead::default(), resp.into_body()))
274            }
275            _ => {
276                let (part, mut body) = resp.into_parts();
277                let buf = body.to_buffer().await?;
278                Err(parse_error(Response::from_parts(part, buf)))
279            }
280        }
281    }
282
283    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
284        let l = LakefsLister::new(
285            self.core.clone(),
286            path.to_string(),
287            args.limit(),
288            args.start_after(),
289            args.recursive(),
290        );
291
292        Ok((RpList::default(), oio::PageLister::new(l)))
293    }
294
295    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
296        Ok((
297            RpWrite::default(),
298            oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
299        ))
300    }
301
302    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
303        Ok((
304            RpDelete::default(),
305            oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
306        ))
307    }
308
309    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
310        let resp = self.core.copy_object(from, to).await?;
311
312        let status = resp.status();
313
314        match status {
315            StatusCode::CREATED => Ok(RpCopy::default()),
316            _ => Err(parse_error(resp)),
317        }
318    }
319}