opendal/services/lakefs/
backend.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::Response;
22use http::StatusCode;
23use log::debug;
24
25use super::LAKEFS_SCHEME;
26use super::config::LakefsConfig;
27use super::core::LakefsCore;
28use super::core::LakefsStatus;
29use super::deleter::LakefsDeleter;
30use super::error::parse_error;
31use super::lister::LakefsLister;
32use super::writer::LakefsWriter;
33use crate::raw::*;
34use crate::*;
35
36#[doc = include_str!("docs.md")]
38#[derive(Debug, Default)]
39pub struct LakefsBuilder {
40 pub(super) config: LakefsConfig,
41}
42
43impl LakefsBuilder {
44 pub fn endpoint(mut self, endpoint: &str) -> Self {
52 if !endpoint.is_empty() {
53 self.config.endpoint = Some(endpoint.to_string());
54 }
55 self
56 }
57
58 pub fn username(mut self, username: &str) -> Self {
60 if !username.is_empty() {
61 self.config.username = Some(username.to_string());
62 }
63 self
64 }
65
66 pub fn password(mut self, password: &str) -> Self {
68 if !password.is_empty() {
69 self.config.password = Some(password.to_string());
70 }
71 self
72 }
73
74 pub fn branch(mut self, branch: &str) -> Self {
82 if !branch.is_empty() {
83 self.config.branch = Some(branch.to_string());
84 }
85 self
86 }
87
88 pub fn root(mut self, root: &str) -> Self {
92 if !root.is_empty() {
93 self.config.root = Some(root.to_string());
94 }
95 self
96 }
97
98 pub fn repository(mut self, repository: &str) -> Self {
102 if !repository.is_empty() {
103 self.config.repository = Some(repository.to_string());
104 }
105 self
106 }
107}
108
109impl Builder for LakefsBuilder {
110 type Config = LakefsConfig;
111
112 fn build(self) -> Result<impl Access> {
114 debug!("backend build started: {:?}", &self);
115
116 let endpoint = match self.config.endpoint {
117 Some(endpoint) => Ok(endpoint.clone()),
118 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
119 .with_operation("Builder::build")
120 .with_context("service", LAKEFS_SCHEME)),
121 }?;
122 debug!("backend use endpoint: {:?}", &endpoint);
123
124 let repository = match &self.config.repository {
125 Some(repository) => Ok(repository.clone()),
126 None => Err(Error::new(ErrorKind::ConfigInvalid, "repository is empty")
127 .with_operation("Builder::build")
128 .with_context("service", LAKEFS_SCHEME)),
129 }?;
130 debug!("backend use repository: {}", &repository);
131
132 let branch = match &self.config.branch {
133 Some(branch) => branch.clone(),
134 None => "main".to_string(),
135 };
136 debug!("backend use branch: {}", &branch);
137
138 let root = normalize_root(&self.config.root.unwrap_or_default());
139 debug!("backend use root: {}", &root);
140
141 let username = match &self.config.username {
142 Some(username) => Ok(username.clone()),
143 None => Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
144 .with_operation("Builder::build")
145 .with_context("service", LAKEFS_SCHEME)),
146 }?;
147
148 let password = match &self.config.password {
149 Some(password) => Ok(password.clone()),
150 None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty")
151 .with_operation("Builder::build")
152 .with_context("service", LAKEFS_SCHEME)),
153 }?;
154
155 Ok(LakefsBackend {
156 core: Arc::new(LakefsCore {
157 info: {
158 let am = AccessorInfo::default();
159 am.set_scheme(LAKEFS_SCHEME)
160 .set_native_capability(Capability {
161 stat: true,
162
163 list: true,
164
165 read: true,
166 write: true,
167 delete: true,
168 copy: true,
169 shared: true,
170 ..Default::default()
171 });
172 am.into()
173 },
174 endpoint,
175 repository,
176 branch,
177 root,
178 username,
179 password,
180 }),
181 })
182 }
183}
184
185#[derive(Debug, Clone)]
187pub struct LakefsBackend {
188 core: Arc<LakefsCore>,
189}
190
191impl Access for LakefsBackend {
192 type Reader = HttpBody;
193 type Writer = oio::OneShotWriter<LakefsWriter>;
194 type Lister = oio::PageLister<LakefsLister>;
195 type Deleter = oio::OneShotDeleter<LakefsDeleter>;
196
197 fn info(&self) -> Arc<AccessorInfo> {
198 self.core.info.clone()
199 }
200
201 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
202 if path == "/" {
204 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
205 }
206
207 let resp = self.core.get_object_metadata(path).await?;
208
209 let status = resp.status();
210
211 match status {
212 StatusCode::OK => {
213 let bs = resp.into_body();
214
215 let decoded_response: LakefsStatus =
216 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
217
218 let meta = LakefsCore::parse_lakefs_status_into_metadata(&decoded_response);
220
221 Ok(RpStat::new(meta))
222 }
223 _ => Err(parse_error(resp)),
224 }
225 }
226
227 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
228 let resp = self
229 .core
230 .get_object_content(path, args.range(), &args)
231 .await?;
232
233 let status = resp.status();
234
235 match status {
236 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
237 Ok((RpRead::default(), resp.into_body()))
238 }
239 _ => {
240 let (part, mut body) = resp.into_parts();
241 let buf = body.to_buffer().await?;
242 Err(parse_error(Response::from_parts(part, buf)))
243 }
244 }
245 }
246
247 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
248 let l = LakefsLister::new(
249 self.core.clone(),
250 path.to_string(),
251 args.limit(),
252 args.start_after(),
253 args.recursive(),
254 );
255
256 Ok((RpList::default(), oio::PageLister::new(l)))
257 }
258
259 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
260 Ok((
261 RpWrite::default(),
262 oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), path.to_string(), args)),
263 ))
264 }
265
266 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
267 Ok((
268 RpDelete::default(),
269 oio::OneShotDeleter::new(LakefsDeleter::new(self.core.clone())),
270 ))
271 }
272
273 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
274 let resp = self.core.copy_object(from, to).await?;
275
276 let status = resp.status();
277
278 match status {
279 StatusCode::CREATED => Ok(RpCopy::default()),
280 _ => Err(parse_error(resp)),
281 }
282 }
283}