opendal_core/services/sqlite/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use mea::once::OnceCell;
19use sqlx::SqlitePool;
20use sqlx::sqlite::SqliteConnectOptions;
21use std::fmt::Debug;
22
23use crate::services::sqlite::backend::parse_sqlite_error;
24use crate::*;
25
26#[derive(Debug, Clone)]
27pub struct SqliteCore {
28    pub pool: OnceCell<SqlitePool>,
29    pub config: SqliteConnectOptions,
30
31    pub table: String,
32    pub key_field: String,
33    pub value_field: String,
34}
35
36impl SqliteCore {
37    pub async fn get_client(&self) -> Result<&SqlitePool> {
38        self.pool
39            .get_or_try_init(|| async {
40                SqlitePool::connect_with(self.config.clone())
41                    .await
42                    .map_err(parse_sqlite_error)
43            })
44            .await
45    }
46
47    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
48        let pool = self.get_client().await?;
49
50        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
51            "SELECT `{}` FROM `{}` WHERE `{}` = $1 LIMIT 1",
52            self.value_field, self.table, self.key_field
53        ))
54        .bind(path)
55        .fetch_optional(pool)
56        .await
57        .map_err(parse_sqlite_error)?;
58
59        Ok(value.map(Buffer::from))
60    }
61
62    pub async fn get_range(
63        &self,
64        path: &str,
65        start: isize,
66        limit: isize,
67    ) -> Result<Option<Buffer>> {
68        let pool = self.get_client().await?;
69        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
70            "SELECT SUBSTR(`{}`, {}, {}) FROM `{}` WHERE `{}` = $1 LIMIT 1",
71            self.value_field,
72            start + 1,
73            limit,
74            self.table,
75            self.key_field
76        ))
77        .bind(path)
78        .fetch_optional(pool)
79        .await
80        .map_err(parse_sqlite_error)?;
81
82        Ok(value.map(Buffer::from))
83    }
84
85    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
86        let pool = self.get_client().await?;
87
88        sqlx::query(&format!(
89            "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
90            self.table, self.key_field, self.value_field,
91        ))
92        .bind(path)
93        .bind(value.to_vec())
94        .execute(pool)
95        .await
96        .map_err(parse_sqlite_error)?;
97
98        Ok(())
99    }
100
101    pub async fn delete(&self, path: &str) -> Result<()> {
102        let pool = self.get_client().await?;
103
104        sqlx::query(&format!(
105            "DELETE FROM `{}` WHERE `{}` = $1",
106            self.table, self.key_field
107        ))
108        .bind(path)
109        .execute(pool)
110        .await
111        .map_err(parse_sqlite_error)?;
112
113        Ok(())
114    }
115}