opendal/services/rocksdb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use rocksdb::DB;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::RocksdbConfig;
27use crate::Result;
28use crate::*;
29
30impl Configurator for RocksdbConfig {
31    type Builder = RocksdbBuilder;
32    fn into_builder(self) -> Self::Builder {
33        RocksdbBuilder { config: self }
34    }
35}
36
37/// RocksDB service support.
38#[doc = include_str!("docs.md")]
39#[derive(Clone, Default)]
40pub struct RocksdbBuilder {
41    config: RocksdbConfig,
42}
43
44impl RocksdbBuilder {
45    /// Set the path to the rocksdb data directory. Will create if not exists.
46    pub fn datadir(mut self, path: &str) -> Self {
47        self.config.datadir = Some(path.into());
48        self
49    }
50
51    /// set the working directory, all operations will be performed under it.
52    ///
53    /// default: "/"
54    pub fn root(mut self, root: &str) -> Self {
55        self.config.root = if root.is_empty() {
56            None
57        } else {
58            Some(root.to_string())
59        };
60
61        self
62    }
63}
64
65impl Builder for RocksdbBuilder {
66    type Config = RocksdbConfig;
67
68    fn build(self) -> Result<impl Access> {
69        let path = self.config.datadir.ok_or_else(|| {
70            Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set")
71                .with_context("service", Scheme::Rocksdb)
72        })?;
73        let db = DB::open_default(&path).map_err(|e| {
74            Error::new(ErrorKind::ConfigInvalid, "open default transaction db")
75                .with_context("service", Scheme::Rocksdb)
76                .with_context("datadir", path)
77                .set_source(e)
78        })?;
79
80        let root = normalize_root(
81            self.config
82                .root
83                .clone()
84                .unwrap_or_else(|| "/".to_string())
85                .as_str(),
86        );
87
88        Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
89    }
90}
91
92/// Backend for rocksdb services.
93pub type RocksdbBackend = kv::Backend<Adapter>;
94
95#[derive(Clone)]
96pub struct Adapter {
97    db: Arc<DB>,
98}
99
100impl Debug for Adapter {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        let mut ds = f.debug_struct("Adapter");
103        ds.field("path", &self.db.path());
104        ds.finish()
105    }
106}
107
108impl kv::Adapter for Adapter {
109    type Scanner = kv::Scanner;
110
111    fn info(&self) -> kv::Info {
112        kv::Info::new(
113            Scheme::Rocksdb,
114            &self.db.path().to_string_lossy(),
115            Capability {
116                read: true,
117                write: true,
118                list: true,
119                shared: false,
120                ..Default::default()
121            },
122        )
123    }
124
125    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
126        let result = self.db.get(path).map_err(parse_rocksdb_error)?;
127        Ok(result.map(Buffer::from))
128    }
129
130    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
131        self.db
132            .put(path, value.to_vec())
133            .map_err(parse_rocksdb_error)
134    }
135
136    async fn delete(&self, path: &str) -> Result<()> {
137        self.db.delete(path).map_err(parse_rocksdb_error)
138    }
139
140    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
141        let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
142        let mut res = Vec::default();
143
144        for key in it {
145            let key = key.map_err(parse_rocksdb_error)?;
146            let key = String::from_utf8_lossy(&key);
147            if !key.starts_with(path) {
148                break;
149            }
150            res.push(key.to_string());
151        }
152
153        Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
154    }
155}
156
157fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
158    Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
159}