opendal_core/services/persy/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use crate::*;
21
22#[derive(Clone)]
23pub struct PersyCore {
24    pub datafile: String,
25    pub segment: String,
26    pub index: String,
27    pub persy: persy::Persy,
28}
29
30impl Debug for PersyCore {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("PersyCore")
33            .field("path", &self.datafile)
34            .field("segment", &self.segment)
35            .field("index", &self.index)
36            .finish_non_exhaustive()
37    }
38}
39
40impl PersyCore {
41    pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
42        let mut read_id = self
43            .persy
44            .get::<String, persy::PersyId>(&self.index, &path.to_string())
45            .map_err(parse_error)?;
46        if let Some(id) = read_id.next() {
47            let value = self.persy.read(&self.segment, &id).map_err(parse_error)?;
48            return Ok(value.map(Buffer::from));
49        }
50
51        Ok(None)
52    }
53
54    pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
55        let mut tx = self.persy.begin().map_err(parse_error)?;
56        let id = tx
57            .insert(&self.segment, &value.to_vec())
58            .map_err(parse_error)?;
59
60        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
61            .map_err(parse_error)?;
62        let prepared = tx.prepare().map_err(parse_error)?;
63        prepared.commit().map_err(parse_error)?;
64
65        Ok(())
66    }
67
68    pub fn delete(&self, path: &str) -> Result<()> {
69        let mut delete_id = self
70            .persy
71            .get::<String, persy::PersyId>(&self.index, &path.to_string())
72            .map_err(parse_error)?;
73        if let Some(id) = delete_id.next() {
74            // Begin a transaction.
75            let mut tx = self.persy.begin().map_err(parse_error)?;
76            // Delete the record.
77            tx.delete(&self.segment, &id).map_err(parse_error)?;
78            // Remove the index.
79            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id))
80                .map_err(parse_error)?;
81            // Commit the tx.
82            let prepared = tx.prepare().map_err(parse_error)?;
83            prepared.commit().map_err(parse_error)?;
84        }
85
86        Ok(())
87    }
88}
89
90fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
91    let err: persy::PersyError = err.persy_error();
92    let kind = match err {
93        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
94        _ => ErrorKind::Unexpected,
95    };
96
97    Error::new(kind, "error from persy").set_source(err)
98}