opendal/services/lakefs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::Response;
22use http::StatusCode;
23use log::debug;
24
25use super::LAKEFS_SCHEME;
26use super::config::LakefsConfig;
27use super::core::LakefsCore;
28use super::core::LakefsStatus;
29use super::deleter::LakefsDeleter;
30use super::error::parse_error;
31use super::lister::LakefsLister;
32use super::writer::LakefsWriter;
33use crate::raw::*;
34use crate::*;
35
36/// [Lakefs](https://docs.lakefs.io/reference/api.html#/)'s API support.
37#[doc = include_str!("docs.md")]
38#[derive(Debug, Default)]
39pub struct LakefsBuilder {
40    pub(super) config: LakefsConfig,
41}
42
43impl LakefsBuilder {
44    /// Set the endpoint of this backend.
45    ///
46    /// endpoint must be full uri.
47    ///
48    /// This is required.
49    /// - `http://127.0.0.1:8000` (lakefs daemon in local)
50    /// - `https://my-lakefs.example.com` (lakefs server)
51    pub fn endpoint(mut self, endpoint: &str) -> Self {
52        if !endpoint.is_empty() {
53            self.config.endpoint = Some(endpoint.to_string());
54        }
55        self
56    }
57
58    /// Set username of this backend. This is required.
59    pub fn username(mut self, username: &str) -> Self {
60        if !username.is_empty() {
61            self.config.username = Some(username.to_string());
62        }
63        self
64    }
65
66    /// Set password of this backend. This is required.
67    pub fn password(mut self, password: &str) -> Self {
68        if !password.is_empty() {
69            self.config.password = Some(password.to_string());
70        }
71        self
72    }
73
74    /// Set branch of this backend or a commit ID. Default is main.
75    ///
76    /// Branch can be a branch name.
77    ///
78    /// For example, branch can be:
79    /// - main
80    /// - 1d0c4eb
81    pub fn branch(mut self, branch: &str) -> Self {
82        if !branch.is_empty() {
83            self.config.branch = Some(branch.to_string());
84        }
85        self
86    }
87
88    /// Set root of this backend.
89    ///
90    /// All operations will happen under this root.
91    pub fn root(mut self, root: &str) -> Self {
92        if !root.is_empty() {
93            self.config.root = Some(root.to_string());
94        }
95        self
96    }
97
98    /// Set the repository of this backend.
99    ///
100    /// This is required.
101    pub fn repository(mut self, repository: &str) -> Self {
102        if !repository.is_empty() {
103            self.config.repository = Some(repository.to_string());
104        }
105        self
106    }
107}
108
109impl Builder for LakefsBuilder {
110    type Config = LakefsConfig;
111
112    /// Build a LakefsBackend.
113    fn build(self) -> Result<impl Access> {
114        debug!("backend build started: {:?}", &self);
115
116        let endpoint = match self.config.endpoint {
117            Some(endpoint) => Ok(endpoint.clone()),
118            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
119                .with_operation("Builder::build")
120                .with_context("service", LAKEFS_SCHEME)),
121        }?;
122        debug!("backend use endpoint: {:?}", &endpoint);
123
124        let repository = match &self.config.repository {
125            Some(repository) => Ok(repository.clone()),
126            None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
127                .with_operation("Builder::build")
128                .with_context("service", LAKEFS_SCHEME)),
129        }?;
130        debug!("backend use repository: {}", &repository);
131
132        let branch = match &self.config.branch {
133            Some(branch) => branch.clone(),
134            None => "main".to_string(),
135        };
136        debug!("backend use branch: {}", &branch);
137
138        let root = normalize_root(&self.config.root.unwrap_or_default());
139        debug!("backend use root: {}", &root);
140
141        let username = match &self.config.username {
142            Some(username) => Ok(username.clone()),
143            None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
144                .with_operation("Builder::build")
145                .with_context("service", LAKEFS_SCHEME)),
146        }?;
147
148        let password = match &self.config.password {
149            Some(password) => Ok(password.clone()),
150            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
151                .with_operation("Builder::build")
152                .with_context("service", LAKEFS_SCHEME)),
153        }?;
154
155        Ok(LakefsBackend {
156            core: Arc::new(LakefsCore {
157                info: {
158                    let am = AccessorInfo::default();
159                    am.set_scheme(LAKEFS_SCHEME)
160                        .set_native_capability(Capability {
161                            stat: true,
162
163                            list: true,
164
165                            read: true,
166                            write: true,
167                            delete: true,
168                            copy: true,
169                            shared: true,
170                            ..Default::default()
171                        });
172                    am.into()
173                },
174                endpoint,
175                repository,
176                branch,
177                root,
178                username,
179                password,
180            }),
181        })
182    }
183}
184
185/// Backend for Lakefs service
186#[derive(Debug, Clone)]
187pub struct LakefsBackend {
188    core: Arc<LakefsCore>,
189}
190
191impl Access for LakefsBackend {
192    type Reader = HttpBody;
193    type Writer = oio::OneShotWriter<LakefsWriter>;
194    type Lister = oio::PageLister<LakefsLister>;
195    type Deleter = oio::OneShotDeleter<LakefsDeleter>;
196
197    fn info(&self) -> Arc<AccessorInfo> {
198        self.core.info.clone()
199    }
200
201    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
202        // Stat root always returns a DIR.
203        if path == "/" {
204            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
205        }
206
207        let resp = self.core.get_object_metadata(path).await?;
208
209        let status = resp.status();
210
211        match status {
212            StatusCode::OK => {
213                let bs = resp.into_body();
214
215                let decoded_response: LakefsStatus =
216                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
217
218                // Use the helper function to parse LakefsStatus into Metadata
219                let meta = LakefsCore::parse_lakefs_status_into_metadata(&decoded_response);
220
221                Ok(RpStat::new(meta))
222            }
223            _ => Err(parse_error(resp)),
224        }
225    }
226
227    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
228        let resp = self
229            .core
230            .get_object_content(path, args.range(), &args)
231            .await?;
232
233        let status = resp.status();
234
235        match status {
236            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
237                Ok((RpRead::default(), resp.into_body()))
238            }
239            _ => {
240                let (part, mut body) = resp.into_parts();
241                let buf = body.to_buffer().await?;
242                Err(parse_error(Response::from_parts(part, buf)))
243            }
244        }
245    }
246
247    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
248        let l = LakefsLister::new(
249            self.core.clone(),
250            path.to_string(),
251            args.limit(),
252            args.start_after(),
253            args.recursive(),
254        );
255
256        Ok((RpList::default(), oio::PageLister::new(l)))
257    }
258
259    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
260        Ok((
261            RpWrite::default(),
262            oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
263        ))
264    }
265
266    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
267        Ok((
268            RpDelete::default(),
269            oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
270        ))
271    }
272
273    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
274        let resp = self.core.copy_object(from, to).await?;
275
276        let status = resp.status();
277
278        match status {
279            StatusCode::CREATED => Ok(RpCopy::default()),
280            _ => Err(parse_error(resp)),
281        }
282    }
283}