1use std::sync::Arc;
19use std::vec::IntoIter;
20
21use super::{Adapter, Scan};
22use crate::raw::oio::HierarchyLister;
23use crate::raw::oio::QueueBuf;
24use crate::raw::*;
25use crate::*;
26
27#[derive(Debug, Clone)]
35pub struct Backend<S: Adapter> {
36 kv: Arc<S>,
37 root: String,
38 info: Arc<AccessorInfo>,
39}
40
41impl<S> Backend<S>
42where
43 S: Adapter,
44{
45 pub fn new(kv: S) -> Self {
47 let kv_info = kv.info();
48 Self {
49 kv: Arc::new(kv),
50 root: "/".to_string(),
51 info: {
52 let am: AccessorInfo = AccessorInfo::default();
53 am.set_root("/");
54 am.set_scheme(kv_info.scheme());
55 am.set_name(kv_info.name());
56
57 let mut cap = kv_info.capabilities();
58 if cap.read {
59 cap.stat = true;
60 }
61
62 if cap.write {
63 cap.write_can_empty = true;
64 cap.delete = true;
65 }
66
67 if cap.list {
68 cap.list_with_recursive = true;
69 }
70
71 am.set_native_capability(cap);
72
73 am.into()
74 },
75 }
76 }
77
78 pub fn with_root(self, root: &str) -> Self {
80 self.with_normalized_root(normalize_root(root))
81 }
82
83 pub(crate) fn with_normalized_root(mut self, root: String) -> Self {
87 let root = normalize_root(&root);
88 self.info.set_root(&root);
89 self.root = root;
90 self
91 }
92}
93
94impl<S: Adapter> Access for Backend<S> {
95 type Reader = Buffer;
96 type Writer = KvWriter<S>;
97 type Lister = HierarchyLister<KvLister<S::Scanner>>;
98 type Deleter = oio::OneShotDeleter<KvDeleter<S>>;
99 type BlockingReader = Buffer;
100 type BlockingWriter = KvWriter<S>;
101 type BlockingLister = HierarchyLister<BlockingKvLister>;
102 type BlockingDeleter = oio::OneShotDeleter<KvDeleter<S>>;
103
104 fn info(&self) -> Arc<AccessorInfo> {
105 self.info.clone()
106 }
107
108 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
109 let p = build_abs_path(&self.root, path);
110
111 if p == build_abs_path(&self.root, "") {
112 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
113 } else {
114 let bs = self.kv.get(&p).await?;
115 match bs {
116 Some(bs) => Ok(RpStat::new(
117 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
118 )),
119 None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
120 }
121 }
122 }
123
124 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
125 let p = build_abs_path(&self.root, path);
126 let bs = match self.kv.get(&p).await? {
127 Some(bs) => bs,
128 None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
129 };
130 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
131 }
132
133 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
134 let p = build_abs_path(&self.root, path);
135
136 Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
137 }
138
139 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
140 Ok((
141 RpDelete::default(),
142 oio::OneShotDeleter::new(KvDeleter::new(self.kv.clone(), self.root.clone())),
143 ))
144 }
145
146 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
147 let p = build_abs_path(&self.root, path);
148 let res = self.kv.scan(&p).await?;
149 let lister = KvLister::new(&self.root, res);
150 let lister = HierarchyLister::new(lister, path, args.recursive());
151
152 Ok((RpList::default(), lister))
153 }
154
155 fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
156 let p = build_abs_path(&self.root, path);
157
158 if p == build_abs_path(&self.root, "") {
159 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
160 } else {
161 let bs = self.kv.blocking_get(&p)?;
162 match bs {
163 Some(bs) => Ok(RpStat::new(
164 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
165 )),
166 None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
167 }
168 }
169 }
170
171 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
172 let p = build_abs_path(&self.root, path);
173 let bs = match self.kv.blocking_get(&p)? {
174 Some(bs) => bs,
175 None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
176 };
177 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
178 }
179
180 fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
181 let p = build_abs_path(&self.root, path);
182
183 Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
184 }
185
186 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
187 Ok((
188 RpDelete::default(),
189 oio::OneShotDeleter::new(KvDeleter::new(self.kv.clone(), self.root.clone())),
190 ))
191 }
192
193 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
194 let p = build_abs_path(&self.root, path);
195 let res = self.kv.blocking_scan(&p)?;
196 let lister = BlockingKvLister::new(&self.root, res);
197 let lister = HierarchyLister::new(lister, path, args.recursive());
198
199 Ok((RpList::default(), lister))
200 }
201}
202
203pub struct KvLister<Iter> {
204 root: String,
205 inner: Iter,
206}
207
208impl<Iter> KvLister<Iter>
209where
210 Iter: Scan,
211{
212 fn new(root: &str, inner: Iter) -> Self {
213 Self {
214 root: root.to_string(),
215 inner,
216 }
217 }
218
219 async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
220 Ok(self.inner.next().await?.map(|v| {
221 let mode = if v.ends_with('/') {
222 EntryMode::DIR
223 } else {
224 EntryMode::FILE
225 };
226 let mut path = build_rel_path(&self.root, &v);
227 if path.is_empty() {
228 path = "/".to_string();
229 }
230 oio::Entry::new(&path, Metadata::new(mode))
231 }))
232 }
233}
234
235impl<Iter> oio::List for KvLister<Iter>
236where
237 Iter: Scan,
238{
239 async fn next(&mut self) -> Result<Option<oio::Entry>> {
240 self.inner_next().await
241 }
242}
243
244pub struct BlockingKvLister {
245 root: String,
246 inner: IntoIter<String>,
247}
248
249impl BlockingKvLister {
250 fn new(root: &str, inner: Vec<String>) -> Self {
251 Self {
252 root: root.to_string(),
253 inner: inner.into_iter(),
254 }
255 }
256
257 fn inner_next(&mut self) -> Option<oio::Entry> {
258 self.inner.next().map(|v| {
259 let mode = if v.ends_with('/') {
260 EntryMode::DIR
261 } else {
262 EntryMode::FILE
263 };
264 let mut path = build_rel_path(&self.root, &v);
265 if path.is_empty() {
266 path = "/".to_string();
267 }
268 oio::Entry::new(&path, Metadata::new(mode))
269 })
270 }
271}
272
273impl oio::BlockingList for BlockingKvLister {
274 fn next(&mut self) -> Result<Option<oio::Entry>> {
275 Ok(self.inner_next())
276 }
277}
278
279pub struct KvWriter<S> {
280 kv: Arc<S>,
281 path: String,
282 buffer: QueueBuf,
283}
284
285impl<S> KvWriter<S> {
286 fn new(kv: Arc<S>, path: String) -> Self {
287 KvWriter {
288 kv,
289 path,
290 buffer: QueueBuf::new(),
291 }
292 }
293}
294
295unsafe impl<S: Adapter> Sync for KvWriter<S> {}
299
300impl<S: Adapter> oio::Write for KvWriter<S> {
301 async fn write(&mut self, bs: Buffer) -> Result<()> {
302 self.buffer.push(bs);
303 Ok(())
304 }
305
306 async fn close(&mut self) -> Result<Metadata> {
307 let buf = self.buffer.clone().collect();
308 let length = buf.len() as u64;
309 self.kv.set(&self.path, buf).await?;
310
311 let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
312 Ok(meta)
313 }
314
315 async fn abort(&mut self) -> Result<()> {
316 self.buffer.clear();
317 Ok(())
318 }
319}
320
321impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
322 fn write(&mut self, bs: Buffer) -> Result<()> {
323 self.buffer.push(bs);
324 Ok(())
325 }
326
327 fn close(&mut self) -> Result<Metadata> {
328 let buf = self.buffer.clone().collect();
329 let length = buf.len() as u64;
330 self.kv.blocking_set(&self.path, buf)?;
331
332 let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
333 Ok(meta)
334 }
335}
336
337pub struct KvDeleter<S> {
338 kv: Arc<S>,
339 root: String,
340}
341
342impl<S> KvDeleter<S> {
343 fn new(kv: Arc<S>, root: String) -> Self {
344 KvDeleter { kv, root }
345 }
346}
347
348impl<S: Adapter> oio::OneShotDelete for KvDeleter<S> {
349 async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
350 let p = build_abs_path(&self.root, &path);
351
352 self.kv.delete(&p).await?;
353 Ok(())
354 }
355}
356
357impl<S: Adapter> oio::BlockingOneShotDelete for KvDeleter<S> {
358 fn blocking_delete_once(&self, path: String, _: OpDelete) -> Result<()> {
359 let p = build_abs_path(&self.root, &path);
360
361 self.kv.blocking_delete(&p)?;
362 Ok(())
363 }
364}