1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26
27use super::LAKEFS_SCHEME;
28use super::core::LakefsCore;
29use super::core::LakefsStatus;
30use super::delete::LakefsDeleter;
31use super::error::parse_error;
32use super::lister::LakefsLister;
33use super::writer::LakefsWriter;
34use crate::raw::*;
35use crate::services::LakefsConfig;
36use crate::*;
37
38#[doc = include_str!("docs.md")]
40#[derive(Default, Clone)]
41pub struct LakefsBuilder {
42    pub(super) config: LakefsConfig,
43}
44
45impl Debug for LakefsBuilder {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        let mut ds = f.debug_struct("Builder");
48
49        ds.field("config", &self.config);
50        ds.finish()
51    }
52}
53
54impl LakefsBuilder {
55    pub fn endpoint(mut self, endpoint: &str) -> Self {
63        if !endpoint.is_empty() {
64            self.config.endpoint = Some(endpoint.to_string());
65        }
66        self
67    }
68
69    pub fn username(mut self, username: &str) -> Self {
71        if !username.is_empty() {
72            self.config.username = Some(username.to_string());
73        }
74        self
75    }
76
77    pub fn password(mut self, password: &str) -> Self {
79        if !password.is_empty() {
80            self.config.password = Some(password.to_string());
81        }
82        self
83    }
84
85    pub fn branch(mut self, branch: &str) -> Self {
93        if !branch.is_empty() {
94            self.config.branch = Some(branch.to_string());
95        }
96        self
97    }
98
99    pub fn root(mut self, root: &str) -> Self {
103        if !root.is_empty() {
104            self.config.root = Some(root.to_string());
105        }
106        self
107    }
108
109    pub fn repository(mut self, repository: &str) -> Self {
113        if !repository.is_empty() {
114            self.config.repository = Some(repository.to_string());
115        }
116        self
117    }
118}
119
120impl Builder for LakefsBuilder {
121    type Config = LakefsConfig;
122
123    fn build(self) -> Result<impl Access> {
125        debug!("backend build started: {:?}", &self);
126
127        let endpoint = match self.config.endpoint {
128            Some(endpoint) => Ok(endpoint.clone()),
129            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
130                .with_operation("Builder::build")
131                .with_context("service", Scheme::Lakefs)),
132        }?;
133        debug!("backend use endpoint: {:?}", &endpoint);
134
135        let repository = match &self.config.repository {
136            Some(repository) => Ok(repository.clone()),
137            None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
138                .with_operation("Builder::build")
139                .with_context("service", Scheme::Lakefs)),
140        }?;
141        debug!("backend use repository: {}", &repository);
142
143        let branch = match &self.config.branch {
144            Some(branch) => branch.clone(),
145            None => "main".to_string(),
146        };
147        debug!("backend use branch: {}", &branch);
148
149        let root = normalize_root(&self.config.root.unwrap_or_default());
150        debug!("backend use root: {}", &root);
151
152        let username = match &self.config.username {
153            Some(username) => Ok(username.clone()),
154            None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
155                .with_operation("Builder::build")
156                .with_context("service", Scheme::Lakefs)),
157        }?;
158
159        let password = match &self.config.password {
160            Some(password) => Ok(password.clone()),
161            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
162                .with_operation("Builder::build")
163                .with_context("service", Scheme::Lakefs)),
164        }?;
165
166        Ok(LakefsBackend {
167            core: Arc::new(LakefsCore {
168                info: {
169                    let am = AccessorInfo::default();
170                    am.set_scheme(LAKEFS_SCHEME)
171                        .set_native_capability(Capability {
172                            stat: true,
173
174                            list: true,
175
176                            read: true,
177                            write: true,
178                            delete: true,
179                            copy: true,
180                            shared: true,
181                            ..Default::default()
182                        });
183                    am.into()
184                },
185                endpoint,
186                repository,
187                branch,
188                root,
189                username,
190                password,
191            }),
192        })
193    }
194}
195
196#[derive(Debug, Clone)]
198pub struct LakefsBackend {
199    core: Arc<LakefsCore>,
200}
201
202impl Access for LakefsBackend {
203    type Reader = HttpBody;
204    type Writer = oio::OneShotWriter<LakefsWriter>;
205    type Lister = oio::PageLister<LakefsLister>;
206    type Deleter = oio::OneShotDeleter<LakefsDeleter>;
207
208    fn info(&self) -> Arc<AccessorInfo> {
209        self.core.info.clone()
210    }
211
212    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
213        if path == "/" {
215            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
216        }
217
218        let resp = self.core.get_object_metadata(path).await?;
219
220        let status = resp.status();
221
222        match status {
223            StatusCode::OK => {
224                let mut meta = parse_into_metadata(path, resp.headers())?;
225                let bs = resp.clone().into_body();
226
227                let decoded_response: LakefsStatus =
228                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
229                if let Some(size_bytes) = decoded_response.size_bytes {
230                    meta.set_content_length(size_bytes);
231                }
232                meta.set_mode(EntryMode::FILE);
233                if let Some(v) = parse_content_disposition(resp.headers())? {
234                    meta.set_content_disposition(v);
235                }
236
237                meta.set_last_modified(Timestamp::from_second(decoded_response.mtime).unwrap());
238
239                Ok(RpStat::new(meta))
240            }
241            _ => Err(parse_error(resp)),
242        }
243    }
244
245    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
246        let resp = self
247            .core
248            .get_object_content(path, args.range(), &args)
249            .await?;
250
251        let status = resp.status();
252
253        match status {
254            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
255                Ok((RpRead::default(), resp.into_body()))
256            }
257            _ => {
258                let (part, mut body) = resp.into_parts();
259                let buf = body.to_buffer().await?;
260                Err(parse_error(Response::from_parts(part, buf)))
261            }
262        }
263    }
264
265    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
266        let l = LakefsLister::new(
267            self.core.clone(),
268            path.to_string(),
269            args.limit(),
270            args.start_after(),
271            args.recursive(),
272        );
273
274        Ok((RpList::default(), oio::PageLister::new(l)))
275    }
276
277    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
278        Ok((
279            RpWrite::default(),
280            oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
281        ))
282    }
283
284    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
285        Ok((
286            RpDelete::default(),
287            oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
288        ))
289    }
290
291    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
292        let resp = self.core.copy_object(from, to).await?;
293
294        let status = resp.status();
295
296        match status {
297            StatusCode::CREATED => Ok(RpCopy::default()),
298            _ => Err(parse_error(resp)),
299        }
300    }
301}