opendal/services/sled/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21
22use tokio::task;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::SledConfig;
27use crate::Builder;
28use crate::Error;
29use crate::ErrorKind;
30use crate::Scheme;
31use crate::*;
32
33// https://github.com/spacejam/sled/blob/69294e59c718289ab3cb6bd03ac3b9e1e072a1e7/src/db.rs#L5
34const DEFAULT_TREE_ID: &str = r#"__sled__default"#;
35
36impl Configurator for SledConfig {
37    type Builder = SledBuilder;
38    fn into_builder(self) -> Self::Builder {
39        SledBuilder { config: self }
40    }
41}
42
43/// Sled services support.
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct SledBuilder {
47    config: SledConfig,
48}
49
50impl Debug for SledBuilder {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("SledBuilder")
53            .field("config", &self.config)
54            .finish()
55    }
56}
57
58impl SledBuilder {
59    /// Set the path to the sled data directory. Will create if not exists.
60    pub fn datadir(mut self, path: &str) -> Self {
61        self.config.datadir = Some(path.into());
62        self
63    }
64
65    /// Set the root for sled.
66    pub fn root(mut self, root: &str) -> Self {
67        self.config.root = if root.is_empty() {
68            None
69        } else {
70            Some(root.to_string())
71        };
72
73        self
74    }
75
76    /// Set the tree for sled.
77    pub fn tree(mut self, tree: &str) -> Self {
78        self.config.tree = Some(tree.into());
79        self
80    }
81}
82
83impl Builder for SledBuilder {
84    const SCHEME: Scheme = Scheme::Sled;
85    type Config = SledConfig;
86
87    fn build(self) -> Result<impl Access> {
88        let datadir_path = self.config.datadir.ok_or_else(|| {
89            Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
90                .with_context("service", Scheme::Sled)
91        })?;
92
93        let db = sled::open(&datadir_path).map_err(|e| {
94            Error::new(ErrorKind::ConfigInvalid, "open db")
95                .with_context("service", Scheme::Sled)
96                .with_context("datadir", datadir_path.clone())
97                .set_source(e)
98        })?;
99
100        // use "default" tree if not set
101        let tree_name = self
102            .config
103            .tree
104            .unwrap_or_else(|| DEFAULT_TREE_ID.to_string());
105
106        let tree = db.open_tree(&tree_name).map_err(|e| {
107            Error::new(ErrorKind::ConfigInvalid, "open tree")
108                .with_context("service", Scheme::Sled)
109                .with_context("datadir", datadir_path.clone())
110                .with_context("tree", tree_name.clone())
111                .set_source(e)
112        })?;
113
114        Ok(SledBackend::new(Adapter {
115            datadir: datadir_path,
116            tree,
117        })
118        .with_root(self.config.root.as_deref().unwrap_or("/")))
119    }
120}
121
122/// Backend for sled services.
123pub type SledBackend = kv::Backend<Adapter>;
124
125#[derive(Clone)]
126pub struct Adapter {
127    datadir: String,
128    tree: sled::Tree,
129}
130
131impl Debug for Adapter {
132    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133        let mut ds = f.debug_struct("Adapter");
134        ds.field("path", &self.datadir);
135        ds.finish()
136    }
137}
138
139impl kv::Adapter for Adapter {
140    type Scanner = kv::Scanner;
141
142    fn info(&self) -> kv::Info {
143        kv::Info::new(
144            Scheme::Sled,
145            &self.datadir,
146            Capability {
147                read: true,
148                write: true,
149                list: true,
150                blocking: true,
151                shared: false,
152                ..Default::default()
153            },
154        )
155    }
156
157    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
158        let cloned_self = self.clone();
159        let cloned_path = path.to_string();
160
161        task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
162            .await
163            .map_err(new_task_join_error)?
164    }
165
166    fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
167        Ok(self
168            .tree
169            .get(path)
170            .map_err(parse_error)?
171            .map(|v| Buffer::from(v.to_vec())))
172    }
173
174    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
175        let cloned_self = self.clone();
176        let cloned_path = path.to_string();
177
178        task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
179            .await
180            .map_err(new_task_join_error)?
181    }
182
183    fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
184        self.tree
185            .insert(path, value.to_vec())
186            .map_err(parse_error)?;
187        Ok(())
188    }
189
190    async fn delete(&self, path: &str) -> Result<()> {
191        let cloned_self = self.clone();
192        let cloned_path = path.to_string();
193
194        task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
195            .await
196            .map_err(new_task_join_error)?
197    }
198
199    fn blocking_delete(&self, path: &str) -> Result<()> {
200        self.tree.remove(path).map_err(parse_error)?;
201
202        Ok(())
203    }
204
205    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
206        let cloned_self = self.clone();
207        let cloned_path = path.to_string();
208
209        let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str()))
210            .await
211            .map_err(new_task_join_error)??;
212
213        Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
214    }
215
216    fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
217        let it = self.tree.scan_prefix(path).keys();
218        let mut res = Vec::default();
219
220        for i in it {
221            let bs = i.map_err(parse_error)?.to_vec();
222            let v = String::from_utf8(bs).map_err(|err| {
223                Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
224                    .set_source(err)
225            })?;
226
227            res.push(v);
228        }
229
230        Ok(res)
231    }
232}
233
234fn parse_error(err: sled::Error) -> Error {
235    Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
236}