opendal/services/persy/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21
22use persy;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::PersyConfig;
27use crate::Builder;
28use crate::Error;
29use crate::ErrorKind;
30use crate::Scheme;
31use crate::*;
32
33impl Configurator for PersyConfig {
34    type Builder = PersyBuilder;
35    fn into_builder(self) -> Self::Builder {
36        PersyBuilder { config: self }
37    }
38}
39
40/// persy service support.
41#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct PersyBuilder {
44    config: PersyConfig,
45}
46
47impl PersyBuilder {
48    /// Set the path to the persy data directory. Will create if not exists.
49    pub fn datafile(mut self, path: &str) -> Self {
50        self.config.datafile = Some(path.into());
51        self
52    }
53
54    /// Set the name of the persy segment. Will create if not exists.
55    pub fn segment(mut self, path: &str) -> Self {
56        self.config.segment = Some(path.into());
57        self
58    }
59
60    /// Set the name of the persy index. Will create if not exists.
61    pub fn index(mut self, path: &str) -> Self {
62        self.config.index = Some(path.into());
63        self
64    }
65}
66
67impl Builder for PersyBuilder {
68    const SCHEME: Scheme = Scheme::Persy;
69    type Config = PersyConfig;
70
71    fn build(self) -> Result<impl Access> {
72        let datafile_path = self.config.datafile.ok_or_else(|| {
73            Error::new(ErrorKind::ConfigInvalid, "datafile is required but not set")
74                .with_context("service", Scheme::Persy)
75        })?;
76
77        let segment_name = self.config.segment.ok_or_else(|| {
78            Error::new(ErrorKind::ConfigInvalid, "segment is required but not set")
79                .with_context("service", Scheme::Persy)
80        })?;
81
82        let segment = segment_name.clone();
83
84        let index_name = self.config.index.ok_or_else(|| {
85            Error::new(ErrorKind::ConfigInvalid, "index is required but not set")
86                .with_context("service", Scheme::Persy)
87        })?;
88
89        let index = index_name.clone();
90
91        let persy = persy::OpenOptions::new()
92            .create(true)
93            .prepare_with(move |p| init(p, &segment_name, &index_name))
94            .open(&datafile_path)
95            .map_err(|e| {
96                Error::new(ErrorKind::ConfigInvalid, "open db")
97                    .with_context("service", Scheme::Persy)
98                    .with_context("datafile", datafile_path.clone())
99                    .set_source(e)
100            })?;
101
102        // This function will only be called on database creation
103        fn init(
104            persy: &persy::Persy,
105            segment_name: &str,
106            index_name: &str,
107        ) -> Result<(), Box<dyn std::error::Error>> {
108            let mut tx = persy.begin()?;
109
110            if !tx.exists_segment(segment_name)? {
111                tx.create_segment(segment_name)?;
112            }
113            if !tx.exists_index(index_name)? {
114                tx.create_index::<String, persy::PersyId>(index_name, persy::ValueMode::Replace)?;
115            }
116
117            let prepared = tx.prepare()?;
118            prepared.commit()?;
119
120            Ok(())
121        }
122
123        Ok(PersyBackend::new(Adapter {
124            datafile: datafile_path,
125            segment,
126            index,
127            persy,
128        }))
129    }
130}
131
132/// Backend for persy services.
133pub type PersyBackend = kv::Backend<Adapter>;
134
135#[derive(Clone)]
136pub struct Adapter {
137    datafile: String,
138    segment: String,
139    index: String,
140    persy: persy::Persy,
141}
142
143impl Debug for Adapter {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        let mut ds = f.debug_struct("Adapter");
146        ds.field("path", &self.datafile);
147        ds.field("segment", &self.segment);
148        ds.field("index", &self.index);
149        ds.finish()
150    }
151}
152
153impl kv::Adapter for Adapter {
154    type Scanner = ();
155
156    fn info(&self) -> kv::Info {
157        kv::Info::new(
158            Scheme::Persy,
159            &self.datafile,
160            Capability {
161                read: true,
162                write: true,
163                delete: true,
164                shared: false,
165                ..Default::default()
166            },
167        )
168    }
169
170    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
171        let mut read_id = self
172            .persy
173            .get::<String, persy::PersyId>(&self.index, &path.to_string())
174            .map_err(parse_error)?;
175        if let Some(id) = read_id.next() {
176            let value = self.persy.read(&self.segment, &id).map_err(parse_error)?;
177            return Ok(value.map(Buffer::from));
178        }
179
180        Ok(None)
181    }
182
183    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
184        let mut tx = self.persy.begin().map_err(parse_error)?;
185        let id = tx
186            .insert(&self.segment, &value.to_vec())
187            .map_err(parse_error)?;
188
189        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
190            .map_err(parse_error)?;
191        let prepared = tx.prepare().map_err(parse_error)?;
192        prepared.commit().map_err(parse_error)?;
193
194        Ok(())
195    }
196
197    async fn delete(&self, path: &str) -> Result<()> {
198        let mut delete_id = self
199            .persy
200            .get::<String, persy::PersyId>(&self.index, &path.to_string())
201            .map_err(parse_error)?;
202        if let Some(id) = delete_id.next() {
203            // Begin a transaction.
204            let mut tx = self.persy.begin().map_err(parse_error)?;
205            // Delete the record.
206            tx.delete(&self.segment, &id).map_err(parse_error)?;
207            // Remove the index.
208            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id))
209                .map_err(parse_error)?;
210            // Commit the tx.
211            let prepared = tx.prepare().map_err(parse_error)?;
212            prepared.commit().map_err(parse_error)?;
213        }
214
215        Ok(())
216    }
217}
218
219fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
220    let err: persy::PersyError = err.persy_error();
221    let kind = match err {
222        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
223        _ => ErrorKind::Unexpected,
224    };
225
226    Error::new(kind, "error from persy").set_source(err)
227}