opendal/services/rocksdb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use rocksdb::DB;
23use tokio::task;
24
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::RocksdbConfig;
28use crate::Result;
29use crate::*;
30
31impl Configurator for RocksdbConfig {
32 type Builder = RocksdbBuilder;
33 fn into_builder(self) -> Self::Builder {
34 RocksdbBuilder { config: self }
35 }
36}
37
38#[doc = include_str!("docs.md")]
40#[derive(Clone, Default)]
41pub struct RocksdbBuilder {
42 config: RocksdbConfig,
43}
44
45impl RocksdbBuilder {
46 pub fn datadir(mut self, path: &str) -> Self {
48 self.config.datadir = Some(path.into());
49 self
50 }
51
52 pub fn root(mut self, root: &str) -> Self {
56 self.config.root = if root.is_empty() {
57 None
58 } else {
59 Some(root.to_string())
60 };
61
62 self
63 }
64}
65
66impl Builder for RocksdbBuilder {
67 const SCHEME: Scheme = Scheme::Rocksdb;
68 type Config = RocksdbConfig;
69
70 fn build(self) -> Result<impl Access> {
71 let path = self.config.datadir.ok_or_else(|| {
72 Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
73 .with_context("service", Scheme::Rocksdb)
74 })?;
75 let db = DB::open_default(&path).map_err(|e| {
76 Error::new(ErrorKind::ConfigInvalid, "open default transaction db")
77 .with_context("service", Scheme::Rocksdb)
78 .with_context("datadir", path)
79 .set_source(e)
80 })?;
81
82 let root = normalize_root(
83 self.config
84 .root
85 .clone()
86 .unwrap_or_else(|| "/".to_string())
87 .as_str(),
88 );
89
90 Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
91 }
92}
93
94pub type RocksdbBackend = kv::Backend<Adapter>;
96
97#[derive(Clone)]
98pub struct Adapter {
99 db: Arc<DB>,
100}
101
102impl Debug for Adapter {
103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104 let mut ds = f.debug_struct("Adapter");
105 ds.field("path", &self.db.path());
106 ds.finish()
107 }
108}
109
110impl kv::Adapter for Adapter {
111 type Scanner = kv::Scanner;
112
113 fn info(&self) -> kv::Info {
114 kv::Info::new(
115 Scheme::Rocksdb,
116 &self.db.path().to_string_lossy(),
117 Capability {
118 read: true,
119 write: true,
120 list: true,
121 blocking: true,
122 shared: false,
123 ..Default::default()
124 },
125 )
126 }
127
128 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
129 let cloned_self = self.clone();
130 let cloned_path = path.to_string();
131
132 task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
133 .await
134 .map_err(new_task_join_error)?
135 }
136
137 fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
138 let result = self.db.get(path).map_err(parse_rocksdb_error)?;
139 Ok(result.map(Buffer::from))
140 }
141
142 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
143 let cloned_self = self.clone();
144 let cloned_path = path.to_string();
145
146 task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
147 .await
148 .map_err(new_task_join_error)?
149 }
150
151 fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
152 self.db
153 .put(path, value.to_vec())
154 .map_err(parse_rocksdb_error)
155 }
156
157 async fn delete(&self, path: &str) -> Result<()> {
158 let cloned_self = self.clone();
159 let cloned_path = path.to_string();
160
161 task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
162 .await
163 .map_err(new_task_join_error)?
164 }
165
166 fn blocking_delete(&self, path: &str) -> Result<()> {
167 self.db.delete(path).map_err(parse_rocksdb_error)
168 }
169
170 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
171 let cloned_self = self.clone();
172 let cloned_path = path.to_string();
173
174 let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str()))
175 .await
176 .map_err(new_task_join_error)??;
177
178 Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
179 }
180
181 fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
183 let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
184 let mut res = Vec::default();
185
186 for key in it {
187 let key = key.map_err(parse_rocksdb_error)?;
188 let key = String::from_utf8_lossy(&key);
189 if !key.starts_with(path) {
190 break;
191 }
192 res.push(key.to_string());
193 }
194
195 Ok(res)
196 }
197}
198
199fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
200 Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
201}