opendal/services/rocksdb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use rocksdb::DB;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::RocksdbConfig;
27use crate::Result;
28use crate::*;
29
30impl Configurator for RocksdbConfig {
31 type Builder = RocksdbBuilder;
32 fn into_builder(self) -> Self::Builder {
33 RocksdbBuilder { config: self }
34 }
35}
36
37#[doc = include_str!("docs.md")]
39#[derive(Clone, Default)]
40pub struct RocksdbBuilder {
41 config: RocksdbConfig,
42}
43
44impl RocksdbBuilder {
45 pub fn datadir(mut self, path: &str) -> Self {
47 self.config.datadir = Some(path.into());
48 self
49 }
50
51 pub fn root(mut self, root: &str) -> Self {
55 self.config.root = if root.is_empty() {
56 None
57 } else {
58 Some(root.to_string())
59 };
60
61 self
62 }
63}
64
65impl Builder for RocksdbBuilder {
66 const SCHEME: Scheme = Scheme::Rocksdb;
67 type Config = RocksdbConfig;
68
69 fn build(self) -> Result<impl Access> {
70 let path = self.config.datadir.ok_or_else(|| {
71 Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
72 .with_context("service", Scheme::Rocksdb)
73 })?;
74 let db = DB::open_default(&path).map_err(|e| {
75 Error::new(ErrorKind::ConfigInvalid, "open default transaction db")
76 .with_context("service", Scheme::Rocksdb)
77 .with_context("datadir", path)
78 .set_source(e)
79 })?;
80
81 let root = normalize_root(
82 self.config
83 .root
84 .clone()
85 .unwrap_or_else(|| "/".to_string())
86 .as_str(),
87 );
88
89 Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
90 }
91}
92
93pub type RocksdbBackend = kv::Backend<Adapter>;
95
96#[derive(Clone)]
97pub struct Adapter {
98 db: Arc<DB>,
99}
100
101impl Debug for Adapter {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 let mut ds = f.debug_struct("Adapter");
104 ds.field("path", &self.db.path());
105 ds.finish()
106 }
107}
108
109impl kv::Adapter for Adapter {
110 type Scanner = kv::Scanner;
111
112 fn info(&self) -> kv::Info {
113 kv::Info::new(
114 Scheme::Rocksdb,
115 &self.db.path().to_string_lossy(),
116 Capability {
117 read: true,
118 write: true,
119 list: true,
120 shared: false,
121 ..Default::default()
122 },
123 )
124 }
125
126 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
127 let result = self.db.get(path).map_err(parse_rocksdb_error)?;
128 Ok(result.map(Buffer::from))
129 }
130
131 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
132 self.db
133 .put(path, value.to_vec())
134 .map_err(parse_rocksdb_error)
135 }
136
137 async fn delete(&self, path: &str) -> Result<()> {
138 self.db.delete(path).map_err(parse_rocksdb_error)
139 }
140
141 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
142 let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
143 let mut res = Vec::default();
144
145 for key in it {
146 let key = key.map_err(parse_rocksdb_error)?;
147 let key = String::from_utf8_lossy(&key);
148 if !key.starts_with(path) {
149 break;
150 }
151 res.push(key.to_string());
152 }
153
154 Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
155 }
156}
157
158fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
159 Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
160}