opendal/services/sqlite/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::task::Context;
23use std::task::Poll;
24
25use futures::stream::BoxStream;
26use futures::Stream;
27use futures::StreamExt;
28use ouroboros::self_referencing;
29use sqlx::sqlite::SqliteConnectOptions;
30use sqlx::SqlitePool;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::SqliteConfig;
36use crate::*;
37
38impl Configurator for SqliteConfig {
39 type Builder = SqliteBuilder;
40 fn into_builder(self) -> Self::Builder {
41 SqliteBuilder { config: self }
42 }
43}
44
45#[doc = include_str!("docs.md")]
46#[derive(Default)]
47pub struct SqliteBuilder {
48 config: SqliteConfig,
49}
50
51impl Debug for SqliteBuilder {
52 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53 let mut ds = f.debug_struct("SqliteBuilder");
54
55 ds.field("config", &self.config);
56 ds.finish()
57 }
58}
59
60impl SqliteBuilder {
61 pub fn connection_string(mut self, v: &str) -> Self {
75 if !v.is_empty() {
76 self.config.connection_string = Some(v.to_string());
77 }
78 self
79 }
80
81 pub fn root(mut self, root: &str) -> Self {
85 self.config.root = if root.is_empty() {
86 None
87 } else {
88 Some(root.to_string())
89 };
90
91 self
92 }
93
94 pub fn table(mut self, table: &str) -> Self {
96 if !table.is_empty() {
97 self.config.table = Some(table.to_string());
98 }
99 self
100 }
101
102 pub fn key_field(mut self, key_field: &str) -> Self {
106 if !key_field.is_empty() {
107 self.config.key_field = Some(key_field.to_string());
108 }
109 self
110 }
111
112 pub fn value_field(mut self, value_field: &str) -> Self {
116 if !value_field.is_empty() {
117 self.config.value_field = Some(value_field.to_string());
118 }
119 self
120 }
121}
122
123impl Builder for SqliteBuilder {
124 const SCHEME: Scheme = Scheme::Sqlite;
125 type Config = SqliteConfig;
126
127 fn build(self) -> Result<impl Access> {
128 let conn = match self.config.connection_string {
129 Some(v) => v,
130 None => {
131 return Err(Error::new(
132 ErrorKind::ConfigInvalid,
133 "connection_string is required but not set",
134 )
135 .with_context("service", Scheme::Sqlite));
136 }
137 };
138
139 let config = SqliteConnectOptions::from_str(&conn).map_err(|err| {
140 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
141 .with_context("service", Scheme::Sqlite)
142 .set_source(err)
143 })?;
144
145 let table = match self.config.table {
146 Some(v) => v,
147 None => {
148 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
149 .with_context("service", Scheme::Sqlite));
150 }
151 };
152
153 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
154
155 let value_field = self
156 .config
157 .value_field
158 .unwrap_or_else(|| "value".to_string());
159
160 let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
161
162 Ok(SqliteBackend::new(Adapter {
163 pool: OnceCell::new(),
164 config,
165 table,
166 key_field,
167 value_field,
168 })
169 .with_normalized_root(root))
170 }
171}
172
173pub type SqliteBackend = kv::Backend<Adapter>;
174
175#[derive(Debug, Clone)]
176pub struct Adapter {
177 pool: OnceCell<SqlitePool>,
178 config: SqliteConnectOptions,
179
180 table: String,
181 key_field: String,
182 value_field: String,
183}
184
185impl Adapter {
186 async fn get_client(&self) -> Result<&SqlitePool> {
187 self.pool
188 .get_or_try_init(|| async {
189 let pool = SqlitePool::connect_with(self.config.clone())
190 .await
191 .map_err(parse_sqlite_error)?;
192 Ok(pool)
193 })
194 .await
195 }
196}
197
198#[self_referencing]
199pub struct SqliteScanner {
200 pool: SqlitePool,
201 query: String,
202
203 #[borrows(pool, query)]
204 #[covariant]
205 stream: BoxStream<'this, Result<String>>,
206}
207
208impl Stream for SqliteScanner {
209 type Item = Result<String>;
210
211 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212 self.with_stream_mut(|s| s.poll_next_unpin(cx))
213 }
214}
215
216unsafe impl Sync for SqliteScanner {}
217
218impl kv::Scan for SqliteScanner {
219 async fn next(&mut self) -> Result<Option<String>> {
220 <Self as StreamExt>::next(self).await.transpose()
221 }
222}
223
224impl kv::Adapter for Adapter {
225 type Scanner = SqliteScanner;
226
227 fn info(&self) -> kv::Info {
228 kv::Info::new(
229 Scheme::Sqlite,
230 &self.table,
231 Capability {
232 read: true,
233 write: true,
234 delete: true,
235 list: true,
236 shared: false,
237 ..Default::default()
238 },
239 )
240 }
241
242 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
243 let pool = self.get_client().await?;
244
245 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
246 "SELECT `{}` FROM `{}` WHERE `{}` = $1 LIMIT 1",
247 self.value_field, self.table, self.key_field
248 ))
249 .bind(path)
250 .fetch_optional(pool)
251 .await
252 .map_err(parse_sqlite_error)?;
253
254 Ok(value.map(Buffer::from))
255 }
256
257 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
258 let pool = self.get_client().await?;
259
260 sqlx::query(&format!(
261 "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
262 self.table, self.key_field, self.value_field,
263 ))
264 .bind(path)
265 .bind(value.to_vec())
266 .execute(pool)
267 .await
268 .map_err(parse_sqlite_error)?;
269
270 Ok(())
271 }
272
273 async fn delete(&self, path: &str) -> Result<()> {
274 let pool = self.get_client().await?;
275
276 sqlx::query(&format!(
277 "DELETE FROM `{}` WHERE `{}` = $1",
278 self.table, self.key_field
279 ))
280 .bind(path)
281 .execute(pool)
282 .await
283 .map_err(parse_sqlite_error)?;
284
285 Ok(())
286 }
287
288 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
289 let pool = self.get_client().await?;
290 let stream = SqliteScannerBuilder {
291 pool: pool.clone(),
292 query: format!(
293 "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1",
294 self.key_field, self.table, self.key_field
295 ),
296 stream_builder: |pool, query| {
297 sqlx::query_scalar(query)
298 .bind(format!("{path}%"))
299 .fetch(pool)
300 .map(|v| v.map_err(parse_sqlite_error))
301 .boxed()
302 },
303 }
304 .build();
305
306 Ok(stream)
307 }
308}
309
310fn parse_sqlite_error(err: sqlx::Error) -> Error {
311 let is_temporary = matches!(
312 &err,
313 sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == "5" || c == "6")
314 );
315
316 let message = if is_temporary {
317 "database is locked or busy"
318 } else {
319 "unhandled error from sqlite"
320 };
321
322 let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
323 if is_temporary {
324 error = error.set_temporary();
325 }
326 error
327}