opendal/services/sled/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21
22use tokio::task;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::SledConfig;
27use crate::Builder;
28use crate::Error;
29use crate::ErrorKind;
30use crate::Scheme;
31use crate::*;
32
33const DEFAULT_TREE_ID: &str = r#"__sled__default"#;
35
36impl Configurator for SledConfig {
37 type Builder = SledBuilder;
38 fn into_builder(self) -> Self::Builder {
39 SledBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct SledBuilder {
47 config: SledConfig,
48}
49
50impl Debug for SledBuilder {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("SledBuilder")
53 .field("config", &self.config)
54 .finish()
55 }
56}
57
58impl SledBuilder {
59 pub fn datadir(mut self, path: &str) -> Self {
61 self.config.datadir = Some(path.into());
62 self
63 }
64
65 pub fn root(mut self, root: &str) -> Self {
67 self.config.root = if root.is_empty() {
68 None
69 } else {
70 Some(root.to_string())
71 };
72
73 self
74 }
75
76 pub fn tree(mut self, tree: &str) -> Self {
78 self.config.tree = Some(tree.into());
79 self
80 }
81}
82
83impl Builder for SledBuilder {
84 const SCHEME: Scheme = Scheme::Sled;
85 type Config = SledConfig;
86
87 fn build(self) -> Result<impl Access> {
88 let datadir_path = self.config.datadir.ok_or_else(|| {
89 Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
90 .with_context("service", Scheme::Sled)
91 })?;
92
93 let db = sled::open(&datadir_path).map_err(|e| {
94 Error::new(ErrorKind::ConfigInvalid, "open db")
95 .with_context("service", Scheme::Sled)
96 .with_context("datadir", datadir_path.clone())
97 .set_source(e)
98 })?;
99
100 let tree_name = self
102 .config
103 .tree
104 .unwrap_or_else(|| DEFAULT_TREE_ID.to_string());
105
106 let tree = db.open_tree(&tree_name).map_err(|e| {
107 Error::new(ErrorKind::ConfigInvalid, "open tree")
108 .with_context("service", Scheme::Sled)
109 .with_context("datadir", datadir_path.clone())
110 .with_context("tree", tree_name.clone())
111 .set_source(e)
112 })?;
113
114 Ok(SledBackend::new(Adapter {
115 datadir: datadir_path,
116 tree,
117 })
118 .with_root(self.config.root.as_deref().unwrap_or("/")))
119 }
120}
121
122pub type SledBackend = kv::Backend<Adapter>;
124
125#[derive(Clone)]
126pub struct Adapter {
127 datadir: String,
128 tree: sled::Tree,
129}
130
131impl Debug for Adapter {
132 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133 let mut ds = f.debug_struct("Adapter");
134 ds.field("path", &self.datadir);
135 ds.finish()
136 }
137}
138
139impl kv::Adapter for Adapter {
140 type Scanner = kv::Scanner;
141
142 fn info(&self) -> kv::Info {
143 kv::Info::new(
144 Scheme::Sled,
145 &self.datadir,
146 Capability {
147 read: true,
148 write: true,
149 list: true,
150 blocking: true,
151 shared: false,
152 ..Default::default()
153 },
154 )
155 }
156
157 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
158 let cloned_self = self.clone();
159 let cloned_path = path.to_string();
160
161 task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
162 .await
163 .map_err(new_task_join_error)?
164 }
165
166 fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
167 Ok(self
168 .tree
169 .get(path)
170 .map_err(parse_error)?
171 .map(|v| Buffer::from(v.to_vec())))
172 }
173
174 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
175 let cloned_self = self.clone();
176 let cloned_path = path.to_string();
177
178 task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
179 .await
180 .map_err(new_task_join_error)?
181 }
182
183 fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
184 self.tree
185 .insert(path, value.to_vec())
186 .map_err(parse_error)?;
187 Ok(())
188 }
189
190 async fn delete(&self, path: &str) -> Result<()> {
191 let cloned_self = self.clone();
192 let cloned_path = path.to_string();
193
194 task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
195 .await
196 .map_err(new_task_join_error)?
197 }
198
199 fn blocking_delete(&self, path: &str) -> Result<()> {
200 self.tree.remove(path).map_err(parse_error)?;
201
202 Ok(())
203 }
204
205 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
206 let cloned_self = self.clone();
207 let cloned_path = path.to_string();
208
209 let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str()))
210 .await
211 .map_err(new_task_join_error)??;
212
213 Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
214 }
215
216 fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
217 let it = self.tree.scan_prefix(path).keys();
218 let mut res = Vec::default();
219
220 for i in it {
221 let bs = i.map_err(parse_error)?.to_vec();
222 let v = String::from_utf8(bs).map_err(|err| {
223 Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
224 .set_source(err)
225 })?;
226
227 res.push(v);
228 }
229
230 Ok(res)
231 }
232}
233
234fn parse_error(err: sled::Error) -> Error {
235 Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
236}