opendal_core/services/sqlite/
core.rs1use mea::once::OnceCell;
19use sqlx::SqlitePool;
20use sqlx::sqlite::SqliteConnectOptions;
21use std::fmt::Debug;
22
23use crate::services::sqlite::backend::parse_sqlite_error;
24use crate::*;
25
26#[derive(Debug, Clone)]
27pub struct SqliteCore {
28 pub pool: OnceCell<SqlitePool>,
29 pub config: SqliteConnectOptions,
30
31 pub table: String,
32 pub key_field: String,
33 pub value_field: String,
34}
35
36impl SqliteCore {
37 pub async fn get_client(&self) -> Result<&SqlitePool> {
38 self.pool
39 .get_or_try_init(|| async {
40 SqlitePool::connect_with(self.config.clone())
41 .await
42 .map_err(parse_sqlite_error)
43 })
44 .await
45 }
46
47 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
48 let pool = self.get_client().await?;
49
50 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
51 "SELECT `{}` FROM `{}` WHERE `{}` = $1 LIMIT 1",
52 self.value_field, self.table, self.key_field
53 ))
54 .bind(path)
55 .fetch_optional(pool)
56 .await
57 .map_err(parse_sqlite_error)?;
58
59 Ok(value.map(Buffer::from))
60 }
61
62 pub async fn get_range(
63 &self,
64 path: &str,
65 start: isize,
66 limit: isize,
67 ) -> Result<Option<Buffer>> {
68 let pool = self.get_client().await?;
69 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
70 "SELECT SUBSTR(`{}`, {}, {}) FROM `{}` WHERE `{}` = $1 LIMIT 1",
71 self.value_field,
72 start + 1,
73 limit,
74 self.table,
75 self.key_field
76 ))
77 .bind(path)
78 .fetch_optional(pool)
79 .await
80 .map_err(parse_sqlite_error)?;
81
82 Ok(value.map(Buffer::from))
83 }
84
85 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
86 let pool = self.get_client().await?;
87
88 sqlx::query(&format!(
89 "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
90 self.table, self.key_field, self.value_field,
91 ))
92 .bind(path)
93 .bind(value.to_vec())
94 .execute(pool)
95 .await
96 .map_err(parse_sqlite_error)?;
97
98 Ok(())
99 }
100
101 pub async fn delete(&self, path: &str) -> Result<()> {
102 let pool = self.get_client().await?;
103
104 sqlx::query(&format!(
105 "DELETE FROM `{}` WHERE `{}` = $1",
106 self.table, self.key_field
107 ))
108 .bind(path)
109 .execute(pool)
110 .await
111 .map_err(parse_sqlite_error)?;
112
113 Ok(())
114 }
115}