opendal/services/persy/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21
22use persy;
23use tokio::task;
24
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::PersyConfig;
28use crate::Builder;
29use crate::Error;
30use crate::ErrorKind;
31use crate::Scheme;
32use crate::*;
33
34impl Configurator for PersyConfig {
35    type Builder = PersyBuilder;
36    fn into_builder(self) -> Self::Builder {
37        PersyBuilder { config: self }
38    }
39}
40
41/// persy service support.
42#[doc = include_str!("docs.md")]
43#[derive(Default, Debug)]
44pub struct PersyBuilder {
45    config: PersyConfig,
46}
47
48impl PersyBuilder {
49    /// Set the path to the persy data directory. Will create if not exists.
50    pub fn datafile(mut self, path: &str) -> Self {
51        self.config.datafile = Some(path.into());
52        self
53    }
54
55    /// Set the name of the persy segment. Will create if not exists.
56    pub fn segment(mut self, path: &str) -> Self {
57        self.config.segment = Some(path.into());
58        self
59    }
60
61    /// Set the name of the persy index. Will create if not exists.
62    pub fn index(mut self, path: &str) -> Self {
63        self.config.index = Some(path.into());
64        self
65    }
66}
67
68impl Builder for PersyBuilder {
69    const SCHEME: Scheme = Scheme::Persy;
70    type Config = PersyConfig;
71
72    fn build(self) -> Result<impl Access> {
73        let datafile_path = self.config.datafile.ok_or_else(|| {
74            Error::new(ErrorKind::ConfigInvalid, "datafile is required but not set")
75                .with_context("service", Scheme::Persy)
76        })?;
77
78        let segment_name = self.config.segment.ok_or_else(|| {
79            Error::new(ErrorKind::ConfigInvalid, "segment is required but not set")
80                .with_context("service", Scheme::Persy)
81        })?;
82
83        let segment = segment_name.clone();
84
85        let index_name = self.config.index.ok_or_else(|| {
86            Error::new(ErrorKind::ConfigInvalid, "index is required but not set")
87                .with_context("service", Scheme::Persy)
88        })?;
89
90        let index = index_name.clone();
91
92        let persy = persy::OpenOptions::new()
93            .create(true)
94            .prepare_with(move |p| init(p, &segment_name, &index_name))
95            .open(&datafile_path)
96            .map_err(|e| {
97                Error::new(ErrorKind::ConfigInvalid, "open db")
98                    .with_context("service", Scheme::Persy)
99                    .with_context("datafile", datafile_path.clone())
100                    .set_source(e)
101            })?;
102
103        // This function will only be called on database creation
104        fn init(
105            persy: &persy::Persy,
106            segment_name: &str,
107            index_name: &str,
108        ) -> Result<(), Box<dyn std::error::Error>> {
109            let mut tx = persy.begin()?;
110
111            if !tx.exists_segment(segment_name)? {
112                tx.create_segment(segment_name)?;
113            }
114            if !tx.exists_index(index_name)? {
115                tx.create_index::<String, persy::PersyId>(index_name, persy::ValueMode::Replace)?;
116            }
117
118            let prepared = tx.prepare()?;
119            prepared.commit()?;
120
121            Ok(())
122        }
123
124        Ok(PersyBackend::new(Adapter {
125            datafile: datafile_path,
126            segment,
127            index,
128            persy,
129        }))
130    }
131}
132
133/// Backend for persy services.
134pub type PersyBackend = kv::Backend<Adapter>;
135
136#[derive(Clone)]
137pub struct Adapter {
138    datafile: String,
139    segment: String,
140    index: String,
141    persy: persy::Persy,
142}
143
144impl Debug for Adapter {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        let mut ds = f.debug_struct("Adapter");
147        ds.field("path", &self.datafile);
148        ds.field("segment", &self.segment);
149        ds.field("index", &self.index);
150        ds.finish()
151    }
152}
153
154impl kv::Adapter for Adapter {
155    type Scanner = ();
156
157    fn info(&self) -> kv::Info {
158        kv::Info::new(
159            Scheme::Persy,
160            &self.datafile,
161            Capability {
162                read: true,
163                write: true,
164                delete: true,
165                blocking: true,
166                shared: false,
167                ..Default::default()
168            },
169        )
170    }
171
172    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
173        let cloned_self = self.clone();
174        let cloned_path = path.to_string();
175        task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
176            .await
177            .map_err(new_task_join_error)
178            .and_then(|inner_result| inner_result)
179    }
180
181    fn blocking_get(&self, path: &str) -> Result<Option<Buffer>> {
182        let mut read_id = self
183            .persy
184            .get::<String, persy::PersyId>(&self.index, &path.to_string())
185            .map_err(parse_error)?;
186        if let Some(id) = read_id.next() {
187            let value = self.persy.read(&self.segment, &id).map_err(parse_error)?;
188            return Ok(value.map(Buffer::from));
189        }
190
191        Ok(None)
192    }
193
194    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
195        let cloned_path = path.to_string();
196        let cloned_self = self.clone();
197
198        task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
199            .await
200            .map_err(new_task_join_error)
201            .and_then(|inner_result| inner_result)
202    }
203
204    fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
205        let mut tx = self.persy.begin().map_err(parse_error)?;
206        let id = tx
207            .insert(&self.segment, &value.to_vec())
208            .map_err(parse_error)?;
209
210        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
211            .map_err(parse_error)?;
212        let prepared = tx.prepare().map_err(parse_error)?;
213        prepared.commit().map_err(parse_error)?;
214
215        Ok(())
216    }
217
218    async fn delete(&self, path: &str) -> Result<()> {
219        let cloned_path = path.to_string();
220        let cloned_self = self.clone();
221
222        task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
223            .await
224            .map_err(new_task_join_error)
225            .and_then(|inner_result| inner_result)
226    }
227
228    fn blocking_delete(&self, path: &str) -> Result<()> {
229        let mut delete_id = self
230            .persy
231            .get::<String, persy::PersyId>(&self.index, &path.to_string())
232            .map_err(parse_error)?;
233        if let Some(id) = delete_id.next() {
234            // Begin a transaction.
235            let mut tx = self.persy.begin().map_err(parse_error)?;
236            // Delete the record.
237            tx.delete(&self.segment, &id).map_err(parse_error)?;
238            // Remove the index.
239            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id))
240                .map_err(parse_error)?;
241            // Commit the tx.
242            let prepared = tx.prepare().map_err(parse_error)?;
243            prepared.commit().map_err(parse_error)?;
244        }
245
246        Ok(())
247    }
248}
249
250fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
251    let err: persy::PersyError = err.persy_error();
252    let kind = match err {
253        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
254        _ => ErrorKind::Unexpected,
255    };
256
257    Error::new(kind, "error from persy").set_source(err)
258}