opendal/services/rocksdb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use rocksdb::DB;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::RocksdbConfig;
27use crate::Result;
28use crate::*;
29
30impl Configurator for RocksdbConfig {
31 type Builder = RocksdbBuilder;
32 fn into_builder(self) -> Self::Builder {
33 RocksdbBuilder { config: self }
34 }
35}
36
37#[doc = include_str!("docs.md")]
39#[derive(Clone, Default)]
40pub struct RocksdbBuilder {
41 config: RocksdbConfig,
42}
43
44impl RocksdbBuilder {
45 pub fn datadir(mut self, path: &str) -> Self {
47 self.config.datadir = Some(path.into());
48 self
49 }
50
51 pub fn root(mut self, root: &str) -> Self {
55 self.config.root = if root.is_empty() {
56 None
57 } else {
58 Some(root.to_string())
59 };
60
61 self
62 }
63}
64
65impl Builder for RocksdbBuilder {
66 type Config = RocksdbConfig;
67
68 fn build(self) -> Result<impl Access> {
69 let path = self.config.datadir.ok_or_else(|| {
70 Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
71 .with_context("service", Scheme::Rocksdb)
72 })?;
73 let db = DB::open_default(&path).map_err(|e| {
74 Error::new(ErrorKind::ConfigInvalid, "open default transaction db")
75 .with_context("service", Scheme::Rocksdb)
76 .with_context("datadir", path)
77 .set_source(e)
78 })?;
79
80 let root = normalize_root(
81 self.config
82 .root
83 .clone()
84 .unwrap_or_else(|| "/".to_string())
85 .as_str(),
86 );
87
88 Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
89 }
90}
91
92pub type RocksdbBackend = kv::Backend<Adapter>;
94
95#[derive(Clone)]
96pub struct Adapter {
97 db: Arc<DB>,
98}
99
100impl Debug for Adapter {
101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102 let mut ds = f.debug_struct("Adapter");
103 ds.field("path", &self.db.path());
104 ds.finish()
105 }
106}
107
108impl kv::Adapter for Adapter {
109 type Scanner = kv::Scanner;
110
111 fn info(&self) -> kv::Info {
112 kv::Info::new(
113 Scheme::Rocksdb,
114 &self.db.path().to_string_lossy(),
115 Capability {
116 read: true,
117 write: true,
118 list: true,
119 shared: false,
120 ..Default::default()
121 },
122 )
123 }
124
125 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
126 let result = self.db.get(path).map_err(parse_rocksdb_error)?;
127 Ok(result.map(Buffer::from))
128 }
129
130 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
131 self.db
132 .put(path, value.to_vec())
133 .map_err(parse_rocksdb_error)
134 }
135
136 async fn delete(&self, path: &str) -> Result<()> {
137 self.db.delete(path).map_err(parse_rocksdb_error)
138 }
139
140 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
141 let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
142 let mut res = Vec::default();
143
144 for key in it {
145 let key = key.map_err(parse_rocksdb_error)?;
146 let key = String::from_utf8_lossy(&key);
147 if !key.starts_with(path) {
148 break;
149 }
150 res.push(key.to_string());
151 }
152
153 Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
154 }
155}
156
157fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
158 Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
159}