opendal/services/persy/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21
22use persy;
23
24use crate::raw::adapters::kv;
25use crate::raw::*;
26use crate::services::PersyConfig;
27use crate::Builder;
28use crate::Error;
29use crate::ErrorKind;
30use crate::Scheme;
31use crate::*;
32
33impl Configurator for PersyConfig {
34    type Builder = PersyBuilder;
35    fn into_builder(self) -> Self::Builder {
36        PersyBuilder { config: self }
37    }
38}
39
40/// persy service support.
41#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct PersyBuilder {
44    config: PersyConfig,
45}
46
47impl PersyBuilder {
48    /// Set the path to the persy data directory. Will create if not exists.
49    pub fn datafile(mut self, path: &str) -> Self {
50        self.config.datafile = Some(path.into());
51        self
52    }
53
54    /// Set the name of the persy segment. Will create if not exists.
55    pub fn segment(mut self, path: &str) -> Self {
56        self.config.segment = Some(path.into());
57        self
58    }
59
60    /// Set the name of the persy index. Will create if not exists.
61    pub fn index(mut self, path: &str) -> Self {
62        self.config.index = Some(path.into());
63        self
64    }
65}
66
67impl Builder for PersyBuilder {
68    type Config = PersyConfig;
69
70    fn build(self) -> Result<impl Access> {
71        let datafile_path = self.config.datafile.ok_or_else(|| {
72            Error::new(ErrorKind::ConfigInvalid, "datafile is required but not set")
73                .with_context("service", Scheme::Persy)
74        })?;
75
76        let segment_name = self.config.segment.ok_or_else(|| {
77            Error::new(ErrorKind::ConfigInvalid, "segment is required but not set")
78                .with_context("service", Scheme::Persy)
79        })?;
80
81        let segment = segment_name.clone();
82
83        let index_name = self.config.index.ok_or_else(|| {
84            Error::new(ErrorKind::ConfigInvalid, "index is required but not set")
85                .with_context("service", Scheme::Persy)
86        })?;
87
88        let index = index_name.clone();
89
90        let persy = persy::OpenOptions::new()
91            .create(true)
92            .prepare_with(move |p| init(p, &segment_name, &index_name))
93            .open(&datafile_path)
94            .map_err(|e| {
95                Error::new(ErrorKind::ConfigInvalid, "open db")
96                    .with_context("service", Scheme::Persy)
97                    .with_context("datafile", datafile_path.clone())
98                    .set_source(e)
99            })?;
100
101        // This function will only be called on database creation
102        fn init(
103            persy: &persy::Persy,
104            segment_name: &str,
105            index_name: &str,
106        ) -> Result<(), Box<dyn std::error::Error>> {
107            let mut tx = persy.begin()?;
108
109            if !tx.exists_segment(segment_name)? {
110                tx.create_segment(segment_name)?;
111            }
112            if !tx.exists_index(index_name)? {
113                tx.create_index::<String, persy::PersyId>(index_name, persy::ValueMode::Replace)?;
114            }
115
116            let prepared = tx.prepare()?;
117            prepared.commit()?;
118
119            Ok(())
120        }
121
122        Ok(PersyBackend::new(Adapter {
123            datafile: datafile_path,
124            segment,
125            index,
126            persy,
127        }))
128    }
129}
130
131/// Backend for persy services.
132pub type PersyBackend = kv::Backend<Adapter>;
133
134#[derive(Clone)]
135pub struct Adapter {
136    datafile: String,
137    segment: String,
138    index: String,
139    persy: persy::Persy,
140}
141
142impl Debug for Adapter {
143    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144        let mut ds = f.debug_struct("Adapter");
145        ds.field("path", &self.datafile);
146        ds.field("segment", &self.segment);
147        ds.field("index", &self.index);
148        ds.finish()
149    }
150}
151
152impl kv::Adapter for Adapter {
153    type Scanner = ();
154
155    fn info(&self) -> kv::Info {
156        kv::Info::new(
157            Scheme::Persy,
158            &self.datafile,
159            Capability {
160                read: true,
161                write: true,
162                delete: true,
163                shared: false,
164                ..Default::default()
165            },
166        )
167    }
168
169    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
170        let mut read_id = self
171            .persy
172            .get::<String, persy::PersyId>(&self.index, &path.to_string())
173            .map_err(parse_error)?;
174        if let Some(id) = read_id.next() {
175            let value = self.persy.read(&self.segment, &id).map_err(parse_error)?;
176            return Ok(value.map(Buffer::from));
177        }
178
179        Ok(None)
180    }
181
182    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
183        let mut tx = self.persy.begin().map_err(parse_error)?;
184        let id = tx
185            .insert(&self.segment, &value.to_vec())
186            .map_err(parse_error)?;
187
188        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
189            .map_err(parse_error)?;
190        let prepared = tx.prepare().map_err(parse_error)?;
191        prepared.commit().map_err(parse_error)?;
192
193        Ok(())
194    }
195
196    async fn delete(&self, path: &str) -> Result<()> {
197        let mut delete_id = self
198            .persy
199            .get::<String, persy::PersyId>(&self.index, &path.to_string())
200            .map_err(parse_error)?;
201        if let Some(id) = delete_id.next() {
202            // Begin a transaction.
203            let mut tx = self.persy.begin().map_err(parse_error)?;
204            // Delete the record.
205            tx.delete(&self.segment, &id).map_err(parse_error)?;
206            // Remove the index.
207            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id))
208                .map_err(parse_error)?;
209            // Commit the tx.
210            let prepared = tx.prepare().map_err(parse_error)?;
211            prepared.commit().map_err(parse_error)?;
212        }
213
214        Ok(())
215    }
216}
217
218fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
219    let err: persy::PersyError = err.persy_error();
220    let kind = match err {
221        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
222        _ => ErrorKind::Unexpected,
223    };
224
225    Error::new(kind, "error from persy").set_source(err)
226}