opendal/services/persy/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21
22use persy;
23
24use crate::Builder;
25use crate::Error;
26use crate::ErrorKind;
27use crate::Scheme;
28use crate::raw::adapters::kv;
29use crate::raw::*;
30use crate::services::PersyConfig;
31use crate::*;
32
33/// persy service support.
34#[doc = include_str!("docs.md")]
35#[derive(Default, Debug)]
36pub struct PersyBuilder {
37    pub(super) config: PersyConfig,
38}
39
40impl PersyBuilder {
41    /// Set the path to the persy data directory. Will create if not exists.
42    pub fn datafile(mut self, path: &str) -> Self {
43        self.config.datafile = Some(path.into());
44        self
45    }
46
47    /// Set the name of the persy segment. Will create if not exists.
48    pub fn segment(mut self, path: &str) -> Self {
49        self.config.segment = Some(path.into());
50        self
51    }
52
53    /// Set the name of the persy index. Will create if not exists.
54    pub fn index(mut self, path: &str) -> Self {
55        self.config.index = Some(path.into());
56        self
57    }
58}
59
60impl Builder for PersyBuilder {
61    type Config = PersyConfig;
62
63    fn build(self) -> Result<impl Access> {
64        let datafile_path = self.config.datafile.ok_or_else(|| {
65            Error::new(ErrorKind::ConfigInvalid, "datafile is required but not set")
66                .with_context("service", Scheme::Persy)
67        })?;
68
69        let segment_name = self.config.segment.ok_or_else(|| {
70            Error::new(ErrorKind::ConfigInvalid, "segment is required but not set")
71                .with_context("service", Scheme::Persy)
72        })?;
73
74        let segment = segment_name.clone();
75
76        let index_name = self.config.index.ok_or_else(|| {
77            Error::new(ErrorKind::ConfigInvalid, "index is required but not set")
78                .with_context("service", Scheme::Persy)
79        })?;
80
81        let index = index_name.clone();
82
83        let persy = persy::OpenOptions::new()
84            .create(true)
85            .prepare_with(move |p| init(p, &segment_name, &index_name))
86            .open(&datafile_path)
87            .map_err(|e| {
88                Error::new(ErrorKind::ConfigInvalid, "open db")
89                    .with_context("service", Scheme::Persy)
90                    .with_context("datafile", datafile_path.clone())
91                    .set_source(e)
92            })?;
93
94        // This function will only be called on database creation
95        fn init(
96            persy: &persy::Persy,
97            segment_name: &str,
98            index_name: &str,
99        ) -> Result<(), Box<dyn std::error::Error>> {
100            let mut tx = persy.begin()?;
101
102            if !tx.exists_segment(segment_name)? {
103                tx.create_segment(segment_name)?;
104            }
105            if !tx.exists_index(index_name)? {
106                tx.create_index::<String, persy::PersyId>(index_name, persy::ValueMode::Replace)?;
107            }
108
109            let prepared = tx.prepare()?;
110            prepared.commit()?;
111
112            Ok(())
113        }
114
115        Ok(PersyBackend::new(Adapter {
116            datafile: datafile_path,
117            segment,
118            index,
119            persy,
120        }))
121    }
122}
123
124/// Backend for persy services.
125pub type PersyBackend = kv::Backend<Adapter>;
126
127#[derive(Clone)]
128pub struct Adapter {
129    datafile: String,
130    segment: String,
131    index: String,
132    persy: persy::Persy,
133}
134
135impl Debug for Adapter {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        let mut ds = f.debug_struct("Adapter");
138        ds.field("path", &self.datafile);
139        ds.field("segment", &self.segment);
140        ds.field("index", &self.index);
141        ds.finish()
142    }
143}
144
145impl kv::Adapter for Adapter {
146    type Scanner = ();
147
148    fn info(&self) -> kv::Info {
149        kv::Info::new(
150            Scheme::Persy,
151            &self.datafile,
152            Capability {
153                read: true,
154                write: true,
155                delete: true,
156                shared: false,
157                ..Default::default()
158            },
159        )
160    }
161
162    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
163        let mut read_id = self
164            .persy
165            .get::<String, persy::PersyId>(&self.index, &path.to_string())
166            .map_err(parse_error)?;
167        if let Some(id) = read_id.next() {
168            let value = self.persy.read(&self.segment, &id).map_err(parse_error)?;
169            return Ok(value.map(Buffer::from));
170        }
171
172        Ok(None)
173    }
174
175    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
176        let mut tx = self.persy.begin().map_err(parse_error)?;
177        let id = tx
178            .insert(&self.segment, &value.to_vec())
179            .map_err(parse_error)?;
180
181        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
182            .map_err(parse_error)?;
183        let prepared = tx.prepare().map_err(parse_error)?;
184        prepared.commit().map_err(parse_error)?;
185
186        Ok(())
187    }
188
189    async fn delete(&self, path: &str) -> Result<()> {
190        let mut delete_id = self
191            .persy
192            .get::<String, persy::PersyId>(&self.index, &path.to_string())
193            .map_err(parse_error)?;
194        if let Some(id) = delete_id.next() {
195            // Begin a transaction.
196            let mut tx = self.persy.begin().map_err(parse_error)?;
197            // Delete the record.
198            tx.delete(&self.segment, &id).map_err(parse_error)?;
199            // Remove the index.
200            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id))
201                .map_err(parse_error)?;
202            // Commit the tx.
203            let prepared = tx.prepare().map_err(parse_error)?;
204            prepared.commit().map_err(parse_error)?;
205        }
206
207        Ok(())
208    }
209}
210
211fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
212    let err: persy::PersyError = err.persy_error();
213    let kind = match err {
214        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
215        _ => ErrorKind::Unexpected,
216    };
217
218    Error::new(kind, "error from persy").set_source(err)
219}