opendal/services/rocksdb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use rocksdb::DB;
23use tokio::task;
24
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::RocksdbConfig;
28use crate::Result;
29use crate::*;
30
31impl Configurator for RocksdbConfig {
32    type Builder = RocksdbBuilder;
33    fn into_builder(self) -> Self::Builder {
34        RocksdbBuilder { config: self }
35    }
36}
37
38/// RocksDB service support.
39#[doc = include_str!("docs.md")]
40#[derive(Clone, Default)]
41pub struct RocksdbBuilder {
42    config: RocksdbConfig,
43}
44
45impl RocksdbBuilder {
46    /// Set the path to the rocksdb data directory. Will create if not exists.
47    pub fn datadir(mut self, path: &str) -> Self {
48        self.config.datadir = Some(path.into());
49        self
50    }
51
52    /// set the working directory, all operations will be performed under it.
53    ///
54    /// default: "/"
55    pub fn root(mut self, root: &str) -> Self {
56        self.config.root = if root.is_empty() {
57            None
58        } else {
59            Some(root.to_string())
60        };
61
62        self
63    }
64}
65
66impl Builder for RocksdbBuilder {
67    const SCHEME: Scheme = Scheme::Rocksdb;
68    type Config = RocksdbConfig;
69
70    fn build(self) -> Result<impl Access> {
71        let path = self.config.datadir.ok_or_else(|| {
72            Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
73                .with_context("service", Scheme::Rocksdb)
74        })?;
75        let db = DB::open_default(&path).map_err(|e| {
76            Error::new(ErrorKind::ConfigInvalid, "open default transaction db")
77                .with_context("service", Scheme::Rocksdb)
78                .with_context("datadir", path)
79                .set_source(e)
80        })?;
81
82        let root = normalize_root(
83            self.config
84                .root
85                .clone()
86                .unwrap_or_else(|| "/".to_string())
87                .as_str(),
88        );
89
90        Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
91    }
92}
93
94/// Backend for rocksdb services.
95pub type RocksdbBackend = kv::Backend<Adapter>;
96
97#[derive(Clone)]
98pub struct Adapter {
99    db: Arc<DB>,
100}
101
102impl Debug for Adapter {
103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104        let mut ds = f.debug_struct("Adapter");
105        ds.field("path", &self.db.path());
106        ds.finish()
107    }
108}
109
110impl kv::Adapter for Adapter {
111    type Scanner = kv::Scanner;
112
113    fn info(&self) -> kv::Info {
114        kv::Info::new(
115            Scheme::Rocksdb,
116            &self.db.path().to_string_lossy(),
117            Capability {
118                read: true,
119                write: true,
120                list: true,
121                blocking: true,
122                shared: false,
123                ..Default::default()
124            },
125        )
126    }
127
128    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
129        let cloned_self = self.clone();
130        let cloned_path = path.to_string();
131
132        task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
133            .await
134            .map_err(new_task_join_error)?
135    }
136
137    fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
138        let result = self.db.get(path).map_err(parse_rocksdb_error)?;
139        Ok(result.map(Buffer::from))
140    }
141
142    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
143        let cloned_self = self.clone();
144        let cloned_path = path.to_string();
145
146        task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
147            .await
148            .map_err(new_task_join_error)?
149    }
150
151    fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
152        self.db
153            .put(path, value.to_vec())
154            .map_err(parse_rocksdb_error)
155    }
156
157    async fn delete(&self, path: &str) -> Result<()> {
158        let cloned_self = self.clone();
159        let cloned_path = path.to_string();
160
161        task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
162            .await
163            .map_err(new_task_join_error)?
164    }
165
166    fn blocking_delete(&self, path: &str) -> Result<()> {
167        self.db.delete(path).map_err(parse_rocksdb_error)
168    }
169
170    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
171        let cloned_self = self.clone();
172        let cloned_path = path.to_string();
173
174        let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str()))
175            .await
176            .map_err(new_task_join_error)??;
177
178        Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
179    }
180
181    /// TODO: we only need key here.
182    fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
183        let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
184        let mut res = Vec::default();
185
186        for key in it {
187            let key = key.map_err(parse_rocksdb_error)?;
188            let key = String::from_utf8_lossy(&key);
189            if !key.starts_with(path) {
190                break;
191            }
192            res.push(key.to_string());
193        }
194
195        Ok(res)
196    }
197}
198
199fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
200    Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
201}