opendal_core/services/redb/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use crate::*;
22
23#[derive(Clone)]
24pub struct RedbCore {
25    pub db: Arc<redb::Database>,
26    pub datadir: Option<String>,
27    pub table: String,
28}
29
30impl Debug for RedbCore {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("RedbCore")
33            .field("path", &self.datadir)
34            .field("table", &self.table)
35            .finish_non_exhaustive()
36    }
37}
38
39impl RedbCore {
40    pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
41        let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;
42
43        let table_define: redb::TableDefinition<&str, &[u8]> =
44            redb::TableDefinition::new(&self.table);
45
46        let table = read_txn
47            .open_table(table_define)
48            .map_err(parse_table_error)?;
49
50        let result = match table.get(path) {
51            Ok(Some(v)) => Ok(Some(v.value().to_vec())),
52            Ok(None) => Ok(None),
53            Err(e) => Err(parse_storage_error(e)),
54        }?;
55        Ok(result.map(Buffer::from))
56    }
57
58    pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
59        let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
60
61        let table_define: redb::TableDefinition<&str, &[u8]> =
62            redb::TableDefinition::new(&self.table);
63
64        {
65            let mut table = write_txn
66                .open_table(table_define)
67                .map_err(parse_table_error)?;
68
69            table
70                .insert(path, &*value.to_vec())
71                .map_err(parse_storage_error)?;
72        }
73
74        write_txn.commit().map_err(parse_commit_error)?;
75        Ok(())
76    }
77
78    pub fn delete(&self, path: &str) -> Result<()> {
79        let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
80
81        let table_define: redb::TableDefinition<&str, &[u8]> =
82            redb::TableDefinition::new(&self.table);
83
84        {
85            let mut table = write_txn
86                .open_table(table_define)
87                .map_err(parse_table_error)?;
88
89            table.remove(path).map_err(parse_storage_error)?;
90        }
91
92        write_txn.commit().map_err(parse_commit_error)?;
93        Ok(())
94    }
95}
96
97fn parse_transaction_error(e: redb::TransactionError) -> Error {
98    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
99}
100
101fn parse_table_error(e: redb::TableError) -> Error {
102    match e {
103        redb::TableError::TableDoesNotExist(_) => {
104            Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
105        }
106        _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
107    }
108}
109
110fn parse_storage_error(e: redb::StorageError) -> Error {
111    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
112}
113
114fn parse_commit_error(e: redb::CommitError) -> Error {
115    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
116}
117
118pub fn parse_database_error(e: redb::DatabaseError) -> Error {
119    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
120}
121
122/// Check if a table exists, otherwise create it.
123pub fn create_table(db: &redb::Database, table: &str) -> Result<()> {
124    // Only one `WriteTransaction` is permitted at same time,
125    // applying new one will block until it available.
126    //
127    // So we first try checking table existence via `ReadTransaction`.
128    {
129        let read_txn = db.begin_read().map_err(parse_transaction_error)?;
130
131        let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);
132
133        match read_txn.open_table(table_define) {
134            Ok(_) => return Ok(()),
135            Err(redb::TableError::TableDoesNotExist(_)) => (),
136            Err(e) => return Err(parse_table_error(e)),
137        }
138    }
139
140    {
141        let write_txn = db.begin_write().map_err(parse_transaction_error)?;
142
143        let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);
144
145        write_txn
146            .open_table(table_define)
147            .map_err(parse_table_error)?;
148        write_txn.commit().map_err(parse_commit_error)?;
149    }
150
151    Ok(())
152}