1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use chrono::TimeZone;
24use chrono::Utc;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28
29use super::core::LakefsCore;
30use super::core::LakefsStatus;
31use super::delete::LakefsDeleter;
32use super::error::parse_error;
33use super::lister::LakefsLister;
34use super::writer::LakefsWriter;
35use crate::raw::*;
36use crate::services::LakefsConfig;
37use crate::*;
38
39impl Configurator for LakefsConfig {
40 type Builder = LakefsBuilder;
41 fn into_builder(self) -> Self::Builder {
42 LakefsBuilder { config: self }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone)]
49pub struct LakefsBuilder {
50 config: LakefsConfig,
51}
52
53impl Debug for LakefsBuilder {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 let mut ds = f.debug_struct("Builder");
56
57 ds.field("config", &self.config);
58 ds.finish()
59 }
60}
61
62impl LakefsBuilder {
63 pub fn endpoint(mut self, endpoint: &str) -> Self {
71 if !endpoint.is_empty() {
72 self.config.endpoint = Some(endpoint.to_string());
73 }
74 self
75 }
76
77 pub fn username(mut self, username: &str) -> Self {
79 if !username.is_empty() {
80 self.config.username = Some(username.to_string());
81 }
82 self
83 }
84
85 pub fn password(mut self, password: &str) -> Self {
87 if !password.is_empty() {
88 self.config.password = Some(password.to_string());
89 }
90 self
91 }
92
93 pub fn branch(mut self, branch: &str) -> Self {
101 if !branch.is_empty() {
102 self.config.branch = Some(branch.to_string());
103 }
104 self
105 }
106
107 pub fn root(mut self, root: &str) -> Self {
111 if !root.is_empty() {
112 self.config.root = Some(root.to_string());
113 }
114 self
115 }
116
117 pub fn repository(mut self, repository: &str) -> Self {
121 if !repository.is_empty() {
122 self.config.repository = Some(repository.to_string());
123 }
124 self
125 }
126}
127
128impl Builder for LakefsBuilder {
129 const SCHEME: Scheme = Scheme::Lakefs;
130 type Config = LakefsConfig;
131
132 fn build(self) -> Result<impl Access> {
134 debug!("backend build started: {:?}", &self);
135
136 let endpoint = match self.config.endpoint {
137 Some(endpoint) => Ok(endpoint.clone()),
138 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
139 .with_operation("Builder::build")
140 .with_context("service", Scheme::Lakefs)),
141 }?;
142 debug!("backend use endpoint: {:?}", &endpoint);
143
144 let repository = match &self.config.repository {
145 Some(repository) => Ok(repository.clone()),
146 None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
147 .with_operation("Builder::build")
148 .with_context("service", Scheme::Lakefs)),
149 }?;
150 debug!("backend use repository: {}", &repository);
151
152 let branch = match &self.config.branch {
153 Some(branch) => branch.clone(),
154 None => "main".to_string(),
155 };
156 debug!("backend use branch: {}", &branch);
157
158 let root = normalize_root(&self.config.root.unwrap_or_default());
159 debug!("backend use root: {}", &root);
160
161 let username = match &self.config.username {
162 Some(username) => Ok(username.clone()),
163 None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
164 .with_operation("Builder::build")
165 .with_context("service", Scheme::Lakefs)),
166 }?;
167
168 let password = match &self.config.password {
169 Some(password) => Ok(password.clone()),
170 None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
171 .with_operation("Builder::build")
172 .with_context("service", Scheme::Lakefs)),
173 }?;
174
175 Ok(LakefsBackend {
176 core: Arc::new(LakefsCore {
177 info: {
178 let am = AccessorInfo::default();
179 am.set_scheme(Scheme::Lakefs)
180 .set_native_capability(Capability {
181 stat: true,
182
183 list: true,
184
185 read: true,
186 write: true,
187 delete: true,
188 copy: true,
189 shared: true,
190 ..Default::default()
191 });
192 am.into()
193 },
194 endpoint,
195 repository,
196 branch,
197 root,
198 username,
199 password,
200 }),
201 })
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct LakefsBackend {
208 core: Arc<LakefsCore>,
209}
210
211impl Access for LakefsBackend {
212 type Reader = HttpBody;
213 type Writer = oio::OneShotWriter<LakefsWriter>;
214 type Lister = oio::PageLister<LakefsLister>;
215 type Deleter = oio::OneShotDeleter<LakefsDeleter>;
216
217 fn info(&self) -> Arc<AccessorInfo> {
218 self.core.info.clone()
219 }
220
221 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
222 if path == "/" {
224 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
225 }
226
227 let resp = self.core.get_object_metadata(path).await?;
228
229 let status = resp.status();
230
231 match status {
232 StatusCode::OK => {
233 let mut meta = parse_into_metadata(path, resp.headers())?;
234 let bs = resp.clone().into_body();
235
236 let decoded_response: LakefsStatus =
237 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
238 if let Some(size_bytes) = decoded_response.size_bytes {
239 meta.set_content_length(size_bytes);
240 }
241 meta.set_mode(EntryMode::FILE);
242 if let Some(v) = parse_content_disposition(resp.headers())? {
243 meta.set_content_disposition(v);
244 }
245
246 meta.set_last_modified(Utc.timestamp_opt(decoded_response.mtime, 0).unwrap());
247
248 Ok(RpStat::new(meta))
249 }
250 _ => Err(parse_error(resp)),
251 }
252 }
253
254 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255 let resp = self
256 .core
257 .get_object_content(path, args.range(), &args)
258 .await?;
259
260 let status = resp.status();
261
262 match status {
263 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
264 Ok((RpRead::default(), resp.into_body()))
265 }
266 _ => {
267 let (part, mut body) = resp.into_parts();
268 let buf = body.to_buffer().await?;
269 Err(parse_error(Response::from_parts(part, buf)))
270 }
271 }
272 }
273
274 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
275 let l = LakefsLister::new(
276 self.core.clone(),
277 path.to_string(),
278 args.limit(),
279 args.start_after(),
280 args.recursive(),
281 );
282
283 Ok((RpList::default(), oio::PageLister::new(l)))
284 }
285
286 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
287 Ok((
288 RpWrite::default(),
289 oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
290 ))
291 }
292
293 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
294 Ok((
295 RpDelete::default(),
296 oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
297 ))
298 }
299
300 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
301 let resp = self.core.copy_object(from, to).await?;
302
303 let status = resp.status();
304
305 match status {
306 StatusCode::CREATED => Ok(RpCopy::default()),
307 _ => Err(parse_error(resp)),
308 }
309 }
310}