opendal/raw/adapters/kv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19use std::vec::IntoIter;
20
21use super::{Adapter, Scan};
22use crate::raw::oio::HierarchyLister;
23use crate::raw::oio::QueueBuf;
24use crate::raw::*;
25use crate::*;
26
27/// Backend of kv service. If the storage service is one k-v-like service, it should implement this kv [`Backend`] by right.
28///
29/// `Backend` implements one general logic on how to read, write, scan the data from one kv store efficiently.
30/// And the [`Adapter`] held by `Backend` will handle how to communicate with one k-v-like service really and provides
31/// a series of basic operation for this service.
32///
33/// OpenDAL developer can implement one new k-v store backend easily with help of this Backend.
34#[derive(Debug, Clone)]
35pub struct Backend<S: Adapter> {
36    kv: Arc<S>,
37    root: String,
38    info: Arc<AccessorInfo>,
39}
40
41impl<S> Backend<S>
42where
43    S: Adapter,
44{
45    /// Create a new kv backend.
46    pub fn new(kv: S) -> Self {
47        let kv_info = kv.info();
48        Self {
49            kv: Arc::new(kv),
50            root: "/".to_string(),
51            info: {
52                let am: AccessorInfo = AccessorInfo::default();
53                am.set_root("/");
54                am.set_scheme(kv_info.scheme());
55                am.set_name(kv_info.name());
56
57                let mut cap = kv_info.capabilities();
58                if cap.read {
59                    cap.stat = true;
60                }
61
62                if cap.write {
63                    cap.write_can_empty = true;
64                    cap.delete = true;
65                }
66
67                if cap.list {
68                    cap.list_with_recursive = true;
69                }
70
71                am.set_native_capability(cap);
72
73                am.into()
74            },
75        }
76    }
77
78    /// Configure root within this backend.
79    pub fn with_root(self, root: &str) -> Self {
80        self.with_normalized_root(normalize_root(root))
81    }
82
83    /// Configure root within this backend.
84    ///
85    /// This method assumes root is normalized.
86    pub(crate) fn with_normalized_root(mut self, root: String) -> Self {
87        let root = normalize_root(&root);
88        self.info.set_root(&root);
89        self.root = root;
90        self
91    }
92}
93
94impl<S: Adapter> Access for Backend<S> {
95    type Reader = Buffer;
96    type Writer = KvWriter<S>;
97    type Lister = HierarchyLister<KvLister<S::Scanner>>;
98    type Deleter = oio::OneShotDeleter<KvDeleter<S>>;
99    type BlockingReader = Buffer;
100    type BlockingWriter = KvWriter<S>;
101    type BlockingLister = HierarchyLister<BlockingKvLister>;
102    type BlockingDeleter = oio::OneShotDeleter<KvDeleter<S>>;
103
104    fn info(&self) -> Arc<AccessorInfo> {
105        self.info.clone()
106    }
107
108    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
109        let p = build_abs_path(&self.root, path);
110
111        if p == build_abs_path(&self.root, "") {
112            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
113        } else {
114            let bs = self.kv.get(&p).await?;
115            match bs {
116                Some(bs) => Ok(RpStat::new(
117                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
118                )),
119                None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
120            }
121        }
122    }
123
124    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
125        let p = build_abs_path(&self.root, path);
126        let bs = match self.kv.get(&p).await? {
127            Some(bs) => bs,
128            None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
129        };
130        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
131    }
132
133    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
134        let p = build_abs_path(&self.root, path);
135
136        Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
137    }
138
139    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
140        Ok((
141            RpDelete::default(),
142            oio::OneShotDeleter::new(KvDeleter::new(self.kv.clone(), self.root.clone())),
143        ))
144    }
145
146    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
147        let p = build_abs_path(&self.root, path);
148        let res = self.kv.scan(&p).await?;
149        let lister = KvLister::new(&self.root, res);
150        let lister = HierarchyLister::new(lister, path, args.recursive());
151
152        Ok((RpList::default(), lister))
153    }
154
155    fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
156        let p = build_abs_path(&self.root, path);
157
158        if p == build_abs_path(&self.root, "") {
159            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
160        } else {
161            let bs = self.kv.blocking_get(&p)?;
162            match bs {
163                Some(bs) => Ok(RpStat::new(
164                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
165                )),
166                None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
167            }
168        }
169    }
170
171    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
172        let p = build_abs_path(&self.root, path);
173        let bs = match self.kv.blocking_get(&p)? {
174            Some(bs) => bs,
175            None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")),
176        };
177        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
178    }
179
180    fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
181        let p = build_abs_path(&self.root, path);
182
183        Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
184    }
185
186    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
187        Ok((
188            RpDelete::default(),
189            oio::OneShotDeleter::new(KvDeleter::new(self.kv.clone(), self.root.clone())),
190        ))
191    }
192
193    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
194        let p = build_abs_path(&self.root, path);
195        let res = self.kv.blocking_scan(&p)?;
196        let lister = BlockingKvLister::new(&self.root, res);
197        let lister = HierarchyLister::new(lister, path, args.recursive());
198
199        Ok((RpList::default(), lister))
200    }
201}
202
203pub struct KvLister<Iter> {
204    root: String,
205    inner: Iter,
206}
207
208impl<Iter> KvLister<Iter>
209where
210    Iter: Scan,
211{
212    fn new(root: &str, inner: Iter) -> Self {
213        Self {
214            root: root.to_string(),
215            inner,
216        }
217    }
218
219    async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
220        Ok(self.inner.next().await?.map(|v| {
221            let mode = if v.ends_with('/') {
222                EntryMode::DIR
223            } else {
224                EntryMode::FILE
225            };
226            let mut path = build_rel_path(&self.root, &v);
227            if path.is_empty() {
228                path = "/".to_string();
229            }
230            oio::Entry::new(&path, Metadata::new(mode))
231        }))
232    }
233}
234
235impl<Iter> oio::List for KvLister<Iter>
236where
237    Iter: Scan,
238{
239    async fn next(&mut self) -> Result<Option<oio::Entry>> {
240        self.inner_next().await
241    }
242}
243
244pub struct BlockingKvLister {
245    root: String,
246    inner: IntoIter<String>,
247}
248
249impl BlockingKvLister {
250    fn new(root: &str, inner: Vec<String>) -> Self {
251        Self {
252            root: root.to_string(),
253            inner: inner.into_iter(),
254        }
255    }
256
257    fn inner_next(&mut self) -> Option<oio::Entry> {
258        self.inner.next().map(|v| {
259            let mode = if v.ends_with('/') {
260                EntryMode::DIR
261            } else {
262                EntryMode::FILE
263            };
264            let mut path = build_rel_path(&self.root, &v);
265            if path.is_empty() {
266                path = "/".to_string();
267            }
268            oio::Entry::new(&path, Metadata::new(mode))
269        })
270    }
271}
272
273impl oio::BlockingList for BlockingKvLister {
274    fn next(&mut self) -> Result<Option<oio::Entry>> {
275        Ok(self.inner_next())
276    }
277}
278
279pub struct KvWriter<S> {
280    kv: Arc<S>,
281    path: String,
282    buffer: QueueBuf,
283}
284
285impl<S> KvWriter<S> {
286    fn new(kv: Arc<S>, path: String) -> Self {
287        KvWriter {
288            kv,
289            path,
290            buffer: QueueBuf::new(),
291        }
292    }
293}
294
295/// # Safety
296///
297/// We will only take `&mut Self` reference for KvWriter.
298unsafe impl<S: Adapter> Sync for KvWriter<S> {}
299
300impl<S: Adapter> oio::Write for KvWriter<S> {
301    async fn write(&mut self, bs: Buffer) -> Result<()> {
302        self.buffer.push(bs);
303        Ok(())
304    }
305
306    async fn close(&mut self) -> Result<Metadata> {
307        let buf = self.buffer.clone().collect();
308        let length = buf.len() as u64;
309        self.kv.set(&self.path, buf).await?;
310
311        let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
312        Ok(meta)
313    }
314
315    async fn abort(&mut self) -> Result<()> {
316        self.buffer.clear();
317        Ok(())
318    }
319}
320
321impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
322    fn write(&mut self, bs: Buffer) -> Result<()> {
323        self.buffer.push(bs);
324        Ok(())
325    }
326
327    fn close(&mut self) -> Result<Metadata> {
328        let buf = self.buffer.clone().collect();
329        let length = buf.len() as u64;
330        self.kv.blocking_set(&self.path, buf)?;
331
332        let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
333        Ok(meta)
334    }
335}
336
337pub struct KvDeleter<S> {
338    kv: Arc<S>,
339    root: String,
340}
341
342impl<S> KvDeleter<S> {
343    fn new(kv: Arc<S>, root: String) -> Self {
344        KvDeleter { kv, root }
345    }
346}
347
348impl<S: Adapter> oio::OneShotDelete for KvDeleter<S> {
349    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
350        let p = build_abs_path(&self.root, &path);
351
352        self.kv.delete(&p).await?;
353        Ok(())
354    }
355}
356
357impl<S: Adapter> oio::BlockingOneShotDelete for KvDeleter<S> {
358    fn blocking_delete_once(&self, path: String, _: OpDelete) -> Result<()> {
359        let p = build_abs_path(&self.root, &path);
360
361        self.kv.blocking_delete(&p)?;
362        Ok(())
363    }
364}