1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use chrono::TimeZone;
24use chrono::Utc;
25use http::Response;
26use http::StatusCode;
27use log::debug;
28
29use super::core::LakefsCore;
30use super::core::LakefsStatus;
31use super::delete::LakefsDeleter;
32use super::error::parse_error;
33use super::lister::LakefsLister;
34use super::writer::LakefsWriter;
35use super::DEFAULT_SCHEME;
36use crate::raw::*;
37use crate::services::LakefsConfig;
38use crate::*;
39impl Configurator for LakefsConfig {
40 type Builder = LakefsBuilder;
41 fn into_builder(self) -> Self::Builder {
42 LakefsBuilder { config: self }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone)]
49pub struct LakefsBuilder {
50 config: LakefsConfig,
51}
52
53impl Debug for LakefsBuilder {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 let mut ds = f.debug_struct("Builder");
56
57 ds.field("config", &self.config);
58 ds.finish()
59 }
60}
61
62impl LakefsBuilder {
63 pub fn endpoint(mut self, endpoint: &str) -> Self {
71 if !endpoint.is_empty() {
72 self.config.endpoint = Some(endpoint.to_string());
73 }
74 self
75 }
76
77 pub fn username(mut self, username: &str) -> Self {
79 if !username.is_empty() {
80 self.config.username = Some(username.to_string());
81 }
82 self
83 }
84
85 pub fn password(mut self, password: &str) -> Self {
87 if !password.is_empty() {
88 self.config.password = Some(password.to_string());
89 }
90 self
91 }
92
93 pub fn branch(mut self, branch: &str) -> Self {
101 if !branch.is_empty() {
102 self.config.branch = Some(branch.to_string());
103 }
104 self
105 }
106
107 pub fn root(mut self, root: &str) -> Self {
111 if !root.is_empty() {
112 self.config.root = Some(root.to_string());
113 }
114 self
115 }
116
117 pub fn repository(mut self, repository: &str) -> Self {
121 if !repository.is_empty() {
122 self.config.repository = Some(repository.to_string());
123 }
124 self
125 }
126}
127
128impl Builder for LakefsBuilder {
129 type Config = LakefsConfig;
130
131 fn build(self) -> Result<impl Access> {
133 debug!("backend build started: {:?}", &self);
134
135 let endpoint = match self.config.endpoint {
136 Some(endpoint) => Ok(endpoint.clone()),
137 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
138 .with_operation("Builder::build")
139 .with_context("service", Scheme::Lakefs)),
140 }?;
141 debug!("backend use endpoint: {:?}", &endpoint);
142
143 let repository = match &self.config.repository {
144 Some(repository) => Ok(repository.clone()),
145 None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
146 .with_operation("Builder::build")
147 .with_context("service", Scheme::Lakefs)),
148 }?;
149 debug!("backend use repository: {}", &repository);
150
151 let branch = match &self.config.branch {
152 Some(branch) => branch.clone(),
153 None => "main".to_string(),
154 };
155 debug!("backend use branch: {}", &branch);
156
157 let root = normalize_root(&self.config.root.unwrap_or_default());
158 debug!("backend use root: {}", &root);
159
160 let username = match &self.config.username {
161 Some(username) => Ok(username.clone()),
162 None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
163 .with_operation("Builder::build")
164 .with_context("service", Scheme::Lakefs)),
165 }?;
166
167 let password = match &self.config.password {
168 Some(password) => Ok(password.clone()),
169 None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
170 .with_operation("Builder::build")
171 .with_context("service", Scheme::Lakefs)),
172 }?;
173
174 Ok(LakefsBackend {
175 core: Arc::new(LakefsCore {
176 info: {
177 let am = AccessorInfo::default();
178 am.set_scheme(DEFAULT_SCHEME)
179 .set_native_capability(Capability {
180 stat: true,
181
182 list: true,
183
184 read: true,
185 write: true,
186 delete: true,
187 copy: true,
188 shared: true,
189 ..Default::default()
190 });
191 am.into()
192 },
193 endpoint,
194 repository,
195 branch,
196 root,
197 username,
198 password,
199 }),
200 })
201 }
202}
203
204#[derive(Debug, Clone)]
206pub struct LakefsBackend {
207 core: Arc<LakefsCore>,
208}
209
210impl Access for LakefsBackend {
211 type Reader = HttpBody;
212 type Writer = oio::OneShotWriter<LakefsWriter>;
213 type Lister = oio::PageLister<LakefsLister>;
214 type Deleter = oio::OneShotDeleter<LakefsDeleter>;
215
216 fn info(&self) -> Arc<AccessorInfo> {
217 self.core.info.clone()
218 }
219
220 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
221 if path == "/" {
223 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
224 }
225
226 let resp = self.core.get_object_metadata(path).await?;
227
228 let status = resp.status();
229
230 match status {
231 StatusCode::OK => {
232 let mut meta = parse_into_metadata(path, resp.headers())?;
233 let bs = resp.clone().into_body();
234
235 let decoded_response: LakefsStatus =
236 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
237 if let Some(size_bytes) = decoded_response.size_bytes {
238 meta.set_content_length(size_bytes);
239 }
240 meta.set_mode(EntryMode::FILE);
241 if let Some(v) = parse_content_disposition(resp.headers())? {
242 meta.set_content_disposition(v);
243 }
244
245 meta.set_last_modified(Utc.timestamp_opt(decoded_response.mtime, 0).unwrap());
246
247 Ok(RpStat::new(meta))
248 }
249 _ => Err(parse_error(resp)),
250 }
251 }
252
253 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
254 let resp = self
255 .core
256 .get_object_content(path, args.range(), &args)
257 .await?;
258
259 let status = resp.status();
260
261 match status {
262 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
263 Ok((RpRead::default(), resp.into_body()))
264 }
265 _ => {
266 let (part, mut body) = resp.into_parts();
267 let buf = body.to_buffer().await?;
268 Err(parse_error(Response::from_parts(part, buf)))
269 }
270 }
271 }
272
273 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
274 let l = LakefsLister::new(
275 self.core.clone(),
276 path.to_string(),
277 args.limit(),
278 args.start_after(),
279 args.recursive(),
280 );
281
282 Ok((RpList::default(), oio::PageLister::new(l)))
283 }
284
285 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
286 Ok((
287 RpWrite::default(),
288 oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
289 ))
290 }
291
292 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
293 Ok((
294 RpDelete::default(),
295 oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
296 ))
297 }
298
299 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
300 let resp = self.core.copy_object(from, to).await?;
301
302 let status = resp.status();
303
304 match status {
305 StatusCode::CREATED => Ok(RpCopy::default()),
306 _ => Err(parse_error(resp)),
307 }
308 }
309}