1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use chrono::TimeZone;
24use chrono::Utc;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28
29use super::core::LakefsCore;
30use super::core::LakefsStatus;
31use super::delete::LakefsDeleter;
32use super::error::parse_error;
33use super::lister::LakefsLister;
34use super::writer::LakefsWriter;
35use crate::raw::*;
36use crate::services::LakefsConfig;
37use crate::*;
38
39impl Configurator for LakefsConfig {
40 type Builder = LakefsBuilder;
41 fn into_builder(self) -> Self::Builder {
42 LakefsBuilder { config: self }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone)]
49pub struct LakefsBuilder {
50 config: LakefsConfig,
51}
52
53impl Debug for LakefsBuilder {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 let mut ds = f.debug_struct("Builder");
56
57 ds.field("config", &self.config);
58 ds.finish()
59 }
60}
61
62impl LakefsBuilder {
63 pub fn endpoint(mut self, endpoint: &str) -> Self {
71 if !endpoint.is_empty() {
72 self.config.endpoint = Some(endpoint.to_string());
73 }
74 self
75 }
76
77 pub fn username(mut self, username: &str) -> Self {
79 if !username.is_empty() {
80 self.config.username = Some(username.to_string());
81 }
82 self
83 }
84
85 pub fn password(mut self, password: &str) -> Self {
87 if !password.is_empty() {
88 self.config.password = Some(password.to_string());
89 }
90 self
91 }
92
93 pub fn branch(mut self, branch: &str) -> Self {
101 if !branch.is_empty() {
102 self.config.branch = Some(branch.to_string());
103 }
104 self
105 }
106
107 pub fn root(mut self, root: &str) -> Self {
111 if !root.is_empty() {
112 self.config.root = Some(root.to_string());
113 }
114 self
115 }
116
117 pub fn repository(mut self, repository: &str) -> Self {
121 if !repository.is_empty() {
122 self.config.repository = Some(repository.to_string());
123 }
124 self
125 }
126}
127
128impl Builder for LakefsBuilder {
129 const SCHEME: Scheme = Scheme::Lakefs;
130 type Config = LakefsConfig;
131
132 fn build(self) -> Result<impl Access> {
134 debug!("backend build started: {:?}", &self);
135
136 let endpoint = match self.config.endpoint {
137 Some(endpoint) => Ok(endpoint.clone()),
138 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
139 .with_operation("Builder::build")
140 .with_context("service", Scheme::Lakefs)),
141 }?;
142 debug!("backend use endpoint: {:?}", &endpoint);
143
144 let repository = match &self.config.repository {
145 Some(repository) => Ok(repository.clone()),
146 None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
147 .with_operation("Builder::build")
148 .with_context("service", Scheme::Lakefs)),
149 }?;
150 debug!("backend use repository: {}", &repository);
151
152 let branch = match &self.config.branch {
153 Some(branch) => branch.clone(),
154 None => "main".to_string(),
155 };
156 debug!("backend use branch: {}", &branch);
157
158 let root = normalize_root(&self.config.root.unwrap_or_default());
159 debug!("backend use root: {}", &root);
160
161 let username = match &self.config.username {
162 Some(username) => Ok(username.clone()),
163 None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
164 .with_operation("Builder::build")
165 .with_context("service", Scheme::Lakefs)),
166 }?;
167
168 let password = match &self.config.password {
169 Some(password) => Ok(password.clone()),
170 None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
171 .with_operation("Builder::build")
172 .with_context("service", Scheme::Lakefs)),
173 }?;
174
175 Ok(LakefsBackend {
176 core: Arc::new(LakefsCore {
177 info: {
178 let am = AccessorInfo::default();
179 am.set_scheme(Scheme::Lakefs)
180 .set_native_capability(Capability {
181 stat: true,
182 stat_has_content_length: true,
183 stat_has_content_disposition: true,
184 stat_has_last_modified: true,
185
186 list: true,
187 list_has_content_length: true,
188 list_has_last_modified: true,
189
190 read: true,
191 write: true,
192 delete: true,
193 copy: true,
194 shared: true,
195 ..Default::default()
196 });
197 am.into()
198 },
199 endpoint,
200 repository,
201 branch,
202 root,
203 username,
204 password,
205 }),
206 })
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct LakefsBackend {
213 core: Arc<LakefsCore>,
214}
215
216impl Access for LakefsBackend {
217 type Reader = HttpBody;
218 type Writer = oio::OneShotWriter<LakefsWriter>;
219 type Lister = oio::PageLister<LakefsLister>;
220 type Deleter = oio::OneShotDeleter<LakefsDeleter>;
221 type BlockingReader = ();
222 type BlockingWriter = ();
223 type BlockingLister = ();
224 type BlockingDeleter = ();
225
226 fn info(&self) -> Arc<AccessorInfo> {
227 self.core.info.clone()
228 }
229
230 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
231 if path == "/" {
233 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
234 }
235
236 let resp = self.core.get_object_metadata(path).await?;
237
238 let status = resp.status();
239
240 match status {
241 StatusCode::OK => {
242 let mut meta = parse_into_metadata(path, resp.headers())?;
243 let bs = resp.clone().into_body();
244
245 let decoded_response: LakefsStatus =
246 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
247 if let Some(size_bytes) = decoded_response.size_bytes {
248 meta.set_content_length(size_bytes);
249 }
250 meta.set_mode(EntryMode::FILE);
251 if let Some(v) = parse_content_disposition(resp.headers())? {
252 meta.set_content_disposition(v);
253 }
254
255 meta.set_last_modified(Utc.timestamp_opt(decoded_response.mtime, 0).unwrap());
256
257 Ok(RpStat::new(meta))
258 }
259 _ => Err(parse_error(resp)),
260 }
261 }
262
263 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
264 let resp = self
265 .core
266 .get_object_content(path, args.range(), &args)
267 .await?;
268
269 let status = resp.status();
270
271 match status {
272 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
273 Ok((RpRead::default(), resp.into_body()))
274 }
275 _ => {
276 let (part, mut body) = resp.into_parts();
277 let buf = body.to_buffer().await?;
278 Err(parse_error(Response::from_parts(part, buf)))
279 }
280 }
281 }
282
283 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
284 let l = LakefsLister::new(
285 self.core.clone(),
286 path.to_string(),
287 args.limit(),
288 args.start_after(),
289 args.recursive(),
290 );
291
292 Ok((RpList::default(), oio::PageLister::new(l)))
293 }
294
295 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
296 Ok((
297 RpWrite::default(),
298 oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
299 ))
300 }
301
302 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
303 Ok((
304 RpDelete::default(),
305 oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
306 ))
307 }
308
309 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
310 let resp = self.core.copy_object(from, to).await?;
311
312 let status = resp.status();
313
314 match status {
315 StatusCode::CREATED => Ok(RpCopy::default()),
316 _ => Err(parse_error(resp)),
317 }
318 }
319}