opendal/services/sqlite/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::task::Context;
23use std::task::Poll;
24
25use futures::stream::BoxStream;
26use futures::Stream;
27use futures::StreamExt;
28use ouroboros::self_referencing;
29use sqlx::sqlite::SqliteConnectOptions;
30use sqlx::SqlitePool;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::SqliteConfig;
36use crate::*;
37
38impl Configurator for SqliteConfig {
39    type Builder = SqliteBuilder;
40    fn into_builder(self) -> Self::Builder {
41        SqliteBuilder { config: self }
42    }
43}
44
45#[doc = include_str!("docs.md")]
46#[derive(Default)]
47pub struct SqliteBuilder {
48    config: SqliteConfig,
49}
50
51impl Debug for SqliteBuilder {
52    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53        let mut ds = f.debug_struct("SqliteBuilder");
54
55        ds.field("config", &self.config);
56        ds.finish()
57    }
58}
59
60impl SqliteBuilder {
61    /// Set the connection_string of the sqlite service.
62    ///
63    /// This connection string is used to connect to the sqlite service. There are url based formats:
64    ///
65    /// ## Url
66    ///
67    /// This format resembles the url format of the sqlite client:
68    ///
69    /// - `sqlite::memory:`
70    /// - `sqlite:data.db`
71    /// - `sqlite://data.db`
72    ///
73    /// For more information, please visit <https://docs.rs/sqlx/latest/sqlx/sqlite/struct.SqliteConnectOptions.html>.
74    pub fn connection_string(mut self, v: &str) -> Self {
75        if !v.is_empty() {
76            self.config.connection_string = Some(v.to_string());
77        }
78        self
79    }
80
81    /// set the working directory, all operations will be performed under it.
82    ///
83    /// default: "/"
84    pub fn root(mut self, root: &str) -> Self {
85        self.config.root = if root.is_empty() {
86            None
87        } else {
88            Some(root.to_string())
89        };
90
91        self
92    }
93
94    /// Set the table name of the sqlite service to read/write.
95    pub fn table(mut self, table: &str) -> Self {
96        if !table.is_empty() {
97            self.config.table = Some(table.to_string());
98        }
99        self
100    }
101
102    /// Set the key field name of the sqlite service to read/write.
103    ///
104    /// Default to `key` if not specified.
105    pub fn key_field(mut self, key_field: &str) -> Self {
106        if !key_field.is_empty() {
107            self.config.key_field = Some(key_field.to_string());
108        }
109        self
110    }
111
112    /// Set the value field name of the sqlite service to read/write.
113    ///
114    /// Default to `value` if not specified.
115    pub fn value_field(mut self, value_field: &str) -> Self {
116        if !value_field.is_empty() {
117            self.config.value_field = Some(value_field.to_string());
118        }
119        self
120    }
121}
122
123impl Builder for SqliteBuilder {
124    const SCHEME: Scheme = Scheme::Sqlite;
125    type Config = SqliteConfig;
126
127    fn build(self) -> Result<impl Access> {
128        let conn = match self.config.connection_string {
129            Some(v) => v,
130            None => {
131                return Err(Error::new(
132                    ErrorKind::ConfigInvalid,
133                    "connection_string is required but not set",
134                )
135                .with_context("service", Scheme::Sqlite));
136            }
137        };
138
139        let config = SqliteConnectOptions::from_str(&conn).map_err(|err| {
140            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
141                .with_context("service", Scheme::Sqlite)
142                .set_source(err)
143        })?;
144
145        let table = match self.config.table {
146            Some(v) => v,
147            None => {
148                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
149                    .with_context("service", Scheme::Sqlite));
150            }
151        };
152
153        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
154
155        let value_field = self
156            .config
157            .value_field
158            .unwrap_or_else(|| "value".to_string());
159
160        let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
161
162        Ok(SqliteBackend::new(Adapter {
163            pool: OnceCell::new(),
164            config,
165            table,
166            key_field,
167            value_field,
168        })
169        .with_normalized_root(root))
170    }
171}
172
173pub type SqliteBackend = kv::Backend<Adapter>;
174
175#[derive(Debug, Clone)]
176pub struct Adapter {
177    pool: OnceCell<SqlitePool>,
178    config: SqliteConnectOptions,
179
180    table: String,
181    key_field: String,
182    value_field: String,
183}
184
185impl Adapter {
186    async fn get_client(&self) -> Result<&SqlitePool> {
187        self.pool
188            .get_or_try_init(|| async {
189                let pool = SqlitePool::connect_with(self.config.clone())
190                    .await
191                    .map_err(parse_sqlite_error)?;
192                Ok(pool)
193            })
194            .await
195    }
196}
197
198#[self_referencing]
199pub struct SqliteScanner {
200    pool: SqlitePool,
201    query: String,
202
203    #[borrows(pool, query)]
204    #[covariant]
205    stream: BoxStream<'this, Result<String>>,
206}
207
208impl Stream for SqliteScanner {
209    type Item = Result<String>;
210
211    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212        self.with_stream_mut(|s| s.poll_next_unpin(cx))
213    }
214}
215
216unsafe impl Sync for SqliteScanner {}
217
218impl kv::Scan for SqliteScanner {
219    async fn next(&mut self) -> Result<Option<String>> {
220        <Self as StreamExt>::next(self).await.transpose()
221    }
222}
223
224impl kv::Adapter for Adapter {
225    type Scanner = SqliteScanner;
226
227    fn info(&self) -> kv::Info {
228        kv::Info::new(
229            Scheme::Sqlite,
230            &self.table,
231            Capability {
232                read: true,
233                write: true,
234                delete: true,
235                list: true,
236                shared: false,
237                ..Default::default()
238            },
239        )
240    }
241
242    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
243        let pool = self.get_client().await?;
244
245        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
246            "SELECT `{}` FROM `{}` WHERE `{}` = $1 LIMIT 1",
247            self.value_field, self.table, self.key_field
248        ))
249        .bind(path)
250        .fetch_optional(pool)
251        .await
252        .map_err(parse_sqlite_error)?;
253
254        Ok(value.map(Buffer::from))
255    }
256
257    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
258        let pool = self.get_client().await?;
259
260        sqlx::query(&format!(
261            "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
262            self.table, self.key_field, self.value_field,
263        ))
264        .bind(path)
265        .bind(value.to_vec())
266        .execute(pool)
267        .await
268        .map_err(parse_sqlite_error)?;
269
270        Ok(())
271    }
272
273    async fn delete(&self, path: &str) -> Result<()> {
274        let pool = self.get_client().await?;
275
276        sqlx::query(&format!(
277            "DELETE FROM `{}` WHERE `{}` = $1",
278            self.table, self.key_field
279        ))
280        .bind(path)
281        .execute(pool)
282        .await
283        .map_err(parse_sqlite_error)?;
284
285        Ok(())
286    }
287
288    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
289        let pool = self.get_client().await?;
290        let stream = SqliteScannerBuilder {
291            pool: pool.clone(),
292            query: format!(
293                "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1",
294                self.key_field, self.table, self.key_field
295            ),
296            stream_builder: |pool, query| {
297                sqlx::query_scalar(query)
298                    .bind(format!("{path}%"))
299                    .fetch(pool)
300                    .map(|v| v.map_err(parse_sqlite_error))
301                    .boxed()
302            },
303        }
304        .build();
305
306        Ok(stream)
307    }
308}
309
310fn parse_sqlite_error(err: sqlx::Error) -> Error {
311    let is_temporary = matches!(
312        &err,
313        sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == "5" || c == "6")
314    );
315
316    let message = if is_temporary {
317        "database is locked or busy"
318    } else {
319        "unhandled error from sqlite"
320    };
321
322    let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
323    if is_temporary {
324        error = error.set_temporary();
325    }
326    error
327}