opendal/services/rocksdb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use rocksdb::DB;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::RocksdbConfig;
27use crate::Result;
28use crate::*;
29
30impl Configurator for RocksdbConfig {
31    type Builder = RocksdbBuilder;
32    fn into_builder(self) -> Self::Builder {
33        RocksdbBuilder { config: self }
34    }
35}
36
37/// RocksDB service support.
38#[doc = include_str!("docs.md")]
39#[derive(Clone, Default)]
40pub struct RocksdbBuilder {
41    config: RocksdbConfig,
42}
43
44impl RocksdbBuilder {
45    /// Set the path to the rocksdb data directory. Will create if not exists.
46    pub fn datadir(mut self, path: &str) -> Self {
47        self.config.datadir = Some(path.into());
48        self
49    }
50
51    /// set the working directory, all operations will be performed under it.
52    ///
53    /// default: "/"
54    pub fn root(mut self, root: &str) -> Self {
55        self.config.root = if root.is_empty() {
56            None
57        } else {
58            Some(root.to_string())
59        };
60
61        self
62    }
63}
64
65impl Builder for RocksdbBuilder {
66    const SCHEME: Scheme = Scheme::Rocksdb;
67    type Config = RocksdbConfig;
68
69    fn build(self) -> Result<impl Access> {
70        let path = self.config.datadir.ok_or_else(|| {
71            Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
72                .with_context("service", Scheme::Rocksdb)
73        })?;
74        let db = DB::open_default(&path).map_err(|e| {
75            Error::new(ErrorKind::ConfigInvalid, "open default transaction db")
76                .with_context("service", Scheme::Rocksdb)
77                .with_context("datadir", path)
78                .set_source(e)
79        })?;
80
81        let root = normalize_root(
82            self.config
83                .root
84                .clone()
85                .unwrap_or_else(|| "/".to_string())
86                .as_str(),
87        );
88
89        Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
90    }
91}
92
93/// Backend for rocksdb services.
94pub type RocksdbBackend = kv::Backend<Adapter>;
95
96#[derive(Clone)]
97pub struct Adapter {
98    db: Arc<DB>,
99}
100
101impl Debug for Adapter {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        let mut ds = f.debug_struct("Adapter");
104        ds.field("path", &self.db.path());
105        ds.finish()
106    }
107}
108
109impl kv::Adapter for Adapter {
110    type Scanner = kv::Scanner;
111
112    fn info(&self) -> kv::Info {
113        kv::Info::new(
114            Scheme::Rocksdb,
115            &self.db.path().to_string_lossy(),
116            Capability {
117                read: true,
118                write: true,
119                list: true,
120                shared: false,
121                ..Default::default()
122            },
123        )
124    }
125
126    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
127        let result = self.db.get(path).map_err(parse_rocksdb_error)?;
128        Ok(result.map(Buffer::from))
129    }
130
131    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
132        self.db
133            .put(path, value.to_vec())
134            .map_err(parse_rocksdb_error)
135    }
136
137    async fn delete(&self, path: &str) -> Result<()> {
138        self.db.delete(path).map_err(parse_rocksdb_error)
139    }
140
141    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
142        let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
143        let mut res = Vec::default();
144
145        for key in it {
146            let key = key.map_err(parse_rocksdb_error)?;
147            let key = String::from_utf8_lossy(&key);
148            if !key.starts_with(path) {
149                break;
150            }
151            res.push(key.to_string());
152        }
153
154        Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
155    }
156}
157
158fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
159    Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
160}