opendal/services/redb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use tokio::task;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::RedbConfig;
27use crate::Builder;
28use crate::Error;
29use crate::ErrorKind;
30use crate::Scheme;
31use crate::*;
32
33impl Configurator for RedbConfig {
34 type Builder = RedbBuilder;
35 fn into_builder(self) -> Self::Builder {
36 RedbBuilder { config: self }
37 }
38}
39
40#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct RedbBuilder {
44 config: RedbConfig,
45}
46
47impl RedbBuilder {
48 pub fn datadir(mut self, path: &str) -> Self {
50 self.config.datadir = Some(path.into());
51 self
52 }
53
54 pub fn table(mut self, table: &str) -> Self {
56 self.config.table = Some(table.into());
57 self
58 }
59
60 pub fn root(mut self, path: &str) -> Self {
62 self.config.root = Some(path.into());
63 self
64 }
65}
66
67impl Builder for RedbBuilder {
68 const SCHEME: Scheme = Scheme::Redb;
69 type Config = RedbConfig;
70
71 fn build(self) -> Result<impl Access> {
72 let datadir_path = self.config.datadir.ok_or_else(|| {
73 Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
74 .with_context("service", Scheme::Redb)
75 })?;
76
77 let table_name = self.config.table.ok_or_else(|| {
78 Error::new(ErrorKind::ConfigInvalid, "table is required but not set")
79 .with_context("service", Scheme::Redb)
80 })?;
81
82 let db = redb::Database::create(&datadir_path).map_err(parse_database_error)?;
83
84 let db = Arc::new(db);
85
86 Ok(RedbBackend::new(Adapter {
87 datadir: datadir_path,
88 table: table_name,
89 db,
90 })
91 .with_root(self.config.root.as_deref().unwrap_or_default()))
92 }
93}
94
95pub type RedbBackend = kv::Backend<Adapter>;
97
98#[derive(Clone)]
99pub struct Adapter {
100 datadir: String,
101 table: String,
102 db: Arc<redb::Database>,
103}
104
105impl Debug for Adapter {
106 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
107 let mut ds = f.debug_struct("Adapter");
108 ds.field("path", &self.datadir);
109 ds.finish()
110 }
111}
112
113impl kv::Adapter for Adapter {
114 type Scanner = ();
115
116 fn info(&self) -> kv::Info {
117 kv::Info::new(
118 Scheme::Redb,
119 &self.datadir,
120 Capability {
121 read: true,
122 write: true,
123 blocking: true,
124 shared: false,
125 ..Default::default()
126 },
127 )
128 }
129
130 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
131 let cloned_self = self.clone();
132 let cloned_path = path.to_string();
133
134 task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
135 .await
136 .map_err(new_task_join_error)
137 .and_then(|inner_result| inner_result)
138 }
139
140 fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
141 let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;
142
143 let table_define: redb::TableDefinition<&str, &[u8]> =
144 redb::TableDefinition::new(&self.table);
145
146 let table = read_txn
147 .open_table(table_define)
148 .map_err(parse_table_error)?;
149
150 let result = match table.get(path) {
151 Ok(Some(v)) => Ok(Some(v.value().to_vec())),
152 Ok(None) => Ok(None),
153 Err(e) => Err(parse_storage_error(e)),
154 }?;
155 Ok(result.map(Buffer::from))
156 }
157
158 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
159 let cloned_self = self.clone();
160 let cloned_path = path.to_string();
161
162 task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
163 .await
164 .map_err(new_task_join_error)
165 .and_then(|inner_result| inner_result)
166 }
167
168 fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
169 let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
170
171 let table_define: redb::TableDefinition<&str, &[u8]> =
172 redb::TableDefinition::new(&self.table);
173
174 {
175 let mut table = write_txn
176 .open_table(table_define)
177 .map_err(parse_table_error)?;
178
179 table
180 .insert(path, &*value.to_vec())
181 .map_err(parse_storage_error)?;
182 }
183
184 write_txn.commit().map_err(parse_commit_error)?;
185 Ok(())
186 }
187
188 async fn delete(&self, path: &str) -> Result<()> {
189 let cloned_self = self.clone();
190 let cloned_path = path.to_string();
191
192 task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
193 .await
194 .map_err(new_task_join_error)
195 .and_then(|inner_result| inner_result)
196 }
197
198 fn blocking_delete(&self, path: &str) -> Result<()> {
199 let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
200
201 let table_define: redb::TableDefinition<&str, &[u8]> =
202 redb::TableDefinition::new(&self.table);
203
204 {
205 let mut table = write_txn
206 .open_table(table_define)
207 .map_err(parse_table_error)?;
208
209 table.remove(path).map_err(parse_storage_error)?;
210 }
211
212 write_txn.commit().map_err(parse_commit_error)?;
213 Ok(())
214 }
215}
216
217fn parse_transaction_error(e: redb::TransactionError) -> Error {
218 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
219}
220
221fn parse_table_error(e: redb::TableError) -> Error {
222 match e {
223 redb::TableError::TableDoesNotExist(_) => {
224 Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
225 }
226 _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
227 }
228}
229
230fn parse_storage_error(e: redb::StorageError) -> Error {
231 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
232}
233
234fn parse_database_error(e: redb::DatabaseError) -> Error {
235 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
236}
237
238fn parse_commit_error(e: redb::CommitError) -> Error {
239 Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
240}