opendal/services/sqlite/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::oio;
19use crate::raw::*;
20use crate::services::SqliteConfig;
21use crate::services::sqlite::core::SqliteCore;
22use crate::services::sqlite::delete::SqliteDeleter;
23use crate::services::sqlite::writer::SqliteWriter;
24use crate::*;
25use sqlx::sqlite::SqliteConnectOptions;
26use std::fmt::Debug;
27use std::fmt::Formatter;
28use std::str::FromStr;
29use tokio::sync::OnceCell;
30
31#[doc = include_str!("docs.md")]
32#[derive(Default)]
33pub struct SqliteBuilder {
34    pub(super) config: SqliteConfig,
35}
36
37impl Debug for SqliteBuilder {
38    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39        let mut ds = f.debug_struct("SqliteBuilder");
40
41        ds.field("config", &self.config);
42        ds.finish()
43    }
44}
45
46impl SqliteBuilder {
47    /// Set the connection_string of the sqlite service.
48    ///
49    /// This connection string is used to connect to the sqlite service. There are url based formats:
50    ///
51    /// ## Url
52    ///
53    /// This format resembles the url format of the sqlite client:
54    ///
55    /// - `sqlite::memory:`
56    /// - `sqlite:data.db`
57    /// - `sqlite://data.db`
58    ///
59    /// For more information, please visit <https://docs.rs/sqlx/latest/sqlx/sqlite/struct.SqliteConnectOptions.html>.
60    pub fn connection_string(mut self, v: &str) -> Self {
61        if !v.is_empty() {
62            self.config.connection_string = Some(v.to_string());
63        }
64        self
65    }
66
67    /// set the working directory, all operations will be performed under it.
68    ///
69    /// default: "/"
70    pub fn root(mut self, root: &str) -> Self {
71        self.config.root = if root.is_empty() {
72            None
73        } else {
74            Some(root.to_string())
75        };
76
77        self
78    }
79
80    /// Set the table name of the sqlite service to read/write.
81    pub fn table(mut self, table: &str) -> Self {
82        if !table.is_empty() {
83            self.config.table = Some(table.to_string());
84        }
85        self
86    }
87
88    /// Set the key field name of the sqlite service to read/write.
89    ///
90    /// Default to `key` if not specified.
91    pub fn key_field(mut self, key_field: &str) -> Self {
92        if !key_field.is_empty() {
93            self.config.key_field = Some(key_field.to_string());
94        }
95        self
96    }
97
98    /// Set the value field name of the sqlite service to read/write.
99    ///
100    /// Default to `value` if not specified.
101    pub fn value_field(mut self, value_field: &str) -> Self {
102        if !value_field.is_empty() {
103            self.config.value_field = Some(value_field.to_string());
104        }
105        self
106    }
107}
108
109impl Builder for SqliteBuilder {
110    type Config = SqliteConfig;
111
112    fn build(self) -> Result<impl Access> {
113        let conn = match self.config.connection_string {
114            Some(v) => v,
115            None => {
116                return Err(Error::new(
117                    ErrorKind::ConfigInvalid,
118                    "connection_string is required but not set",
119                )
120                .with_context("service", Scheme::Sqlite));
121            }
122        };
123
124        let config = SqliteConnectOptions::from_str(&conn).map_err(|err| {
125            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
126                .with_context("service", Scheme::Sqlite)
127                .set_source(err)
128        })?;
129
130        let table = match self.config.table {
131            Some(v) => v,
132            None => {
133                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
134                    .with_context("service", Scheme::Sqlite));
135            }
136        };
137
138        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
139
140        let value_field = self
141            .config
142            .value_field
143            .unwrap_or_else(|| "value".to_string());
144
145        let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
146
147        Ok(SqliteAccessor::new(SqliteCore {
148            pool: OnceCell::new(),
149            config,
150            table,
151            key_field,
152            value_field,
153        })
154        .with_normalized_root(root))
155    }
156}
157
158pub fn parse_sqlite_error(err: sqlx::Error) -> Error {
159    let is_temporary = matches!(
160        &err,
161        sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == "5" || c == "6")
162    );
163
164    let message = if is_temporary {
165        "database is locked or busy"
166    } else {
167        "unhandled error from sqlite"
168    };
169
170    let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
171    if is_temporary {
172        error = error.set_temporary();
173    }
174    error
175}
176
177/// SqliteAccessor implements Access trait directly
178#[derive(Debug, Clone)]
179pub struct SqliteAccessor {
180    core: std::sync::Arc<SqliteCore>,
181    root: String,
182    info: std::sync::Arc<AccessorInfo>,
183}
184
185impl SqliteAccessor {
186    fn new(core: SqliteCore) -> Self {
187        let info = AccessorInfo::default();
188        info.set_scheme(Scheme::Sqlite.into());
189        info.set_name(&core.table);
190        info.set_root("/");
191        info.set_native_capability(Capability {
192            read: true,
193            write: true,
194            create_dir: true,
195            delete: true,
196            stat: true,
197            write_can_empty: true,
198            list: false,
199            shared: false,
200            ..Default::default()
201        });
202
203        Self {
204            core: std::sync::Arc::new(core),
205            root: "/".to_string(),
206            info: std::sync::Arc::new(info),
207        }
208    }
209
210    fn with_normalized_root(mut self, root: String) -> Self {
211        self.info.set_root(&root);
212        self.root = root;
213        self
214    }
215}
216
217impl Access for SqliteAccessor {
218    type Reader = Buffer;
219    type Writer = SqliteWriter;
220    type Lister = ();
221    type Deleter = oio::OneShotDeleter<SqliteDeleter>;
222
223    fn info(&self) -> std::sync::Arc<AccessorInfo> {
224        self.info.clone()
225    }
226
227    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
228        let p = build_abs_path(&self.root, path);
229
230        if p == build_abs_path(&self.root, "") {
231            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
232        } else {
233            let bs = self.core.get(&p).await?;
234            match bs {
235                Some(bs) => Ok(RpStat::new(
236                    Metadata::new(EntryMode::from_path(&p)).with_content_length(bs.len() as u64),
237                )),
238                None => {
239                    // Check if this might be a directory by looking for keys with this prefix
240                    let dir_path = if p.ends_with('/') {
241                        p.clone()
242                    } else {
243                        format!("{}/", p)
244                    };
245                    let count: i64 = sqlx::query_scalar(&format!(
246                        "SELECT COUNT(*) FROM `{}` WHERE `{}` LIKE $1 LIMIT 1",
247                        self.core.table, self.core.key_field
248                    ))
249                    .bind(format!("{}%", dir_path))
250                    .fetch_one(self.core.get_client().await?)
251                    .await
252                    .map_err(crate::services::sqlite::backend::parse_sqlite_error)?;
253
254                    if count > 0 {
255                        // Directory exists (has children)
256                        Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
257                    } else {
258                        Err(Error::new(ErrorKind::NotFound, "key not found in sqlite"))
259                    }
260                }
261            }
262        }
263    }
264
265    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
266        let p = build_abs_path(&self.root, path);
267
268        let range = args.range();
269        let buffer = if range.is_full() {
270            // Full read - use GET
271            match self.core.get(&p).await? {
272                Some(bs) => bs,
273                None => return Err(Error::new(ErrorKind::NotFound, "key not found in sqlite")),
274            }
275        } else {
276            // Range read - use GETRANGE
277            let start = range.offset() as isize;
278            let limit = match range.size() {
279                Some(size) => size as isize,
280                None => -1, // Sqlite uses -1 for end of string
281            };
282
283            match self.core.get_range(&p, start, limit).await? {
284                Some(bs) => bs,
285                None => return Err(Error::new(ErrorKind::NotFound, "key not found in sqlite")),
286            }
287        };
288
289        Ok((RpRead::new(), buffer))
290    }
291
292    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
293        let p = build_abs_path(&self.root, path);
294        Ok((RpWrite::new(), SqliteWriter::new(self.core.clone(), &p)))
295    }
296
297    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
298        Ok((
299            RpDelete::default(),
300            oio::OneShotDeleter::new(SqliteDeleter::new(self.core.clone(), self.root.clone())),
301        ))
302    }
303
304    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
305        let p = build_abs_path(&self.root, path);
306
307        // Ensure path ends with '/' for directory marker
308        let dir_path = if p.ends_with('/') {
309            p
310        } else {
311            format!("{}/", p)
312        };
313
314        // Store directory marker with empty content
315        self.core.set(&dir_path, Buffer::new()).await?;
316
317        Ok(RpCreateDir::default())
318    }
319}
320
321#[cfg(test)]
322mod test {
323    use super::*;
324    use sqlx::SqlitePool;
325
326    async fn build_client() -> OnceCell<SqlitePool> {
327        let config = SqliteConnectOptions::from_str("sqlite::memory:").unwrap();
328        let pool = SqlitePool::connect_with(config).await.unwrap();
329        OnceCell::new_with(Some(pool))
330    }
331
332    #[tokio::test]
333    async fn test_sqlite_accessor_creation() {
334        let core = SqliteCore {
335            pool: build_client().await,
336            config: Default::default(),
337            table: "test".to_string(),
338            key_field: "key".to_string(),
339            value_field: "value".to_string(),
340        };
341
342        let accessor = SqliteAccessor::new(core);
343
344        // Verify basic properties
345        assert_eq!(accessor.root, "/");
346        assert_eq!(
347            accessor.info.scheme(),
348            <Scheme as Into<&'static str>>::into(Scheme::Sqlite)
349        );
350        assert!(accessor.info.native_capability().read);
351        assert!(accessor.info.native_capability().write);
352        assert!(accessor.info.native_capability().delete);
353        assert!(accessor.info.native_capability().stat);
354    }
355
356    #[tokio::test]
357    async fn test_sqlite_accessor_with_root() {
358        let core = SqliteCore {
359            pool: build_client().await,
360            config: Default::default(),
361            table: "test".to_string(),
362            key_field: "key".to_string(),
363            value_field: "value".to_string(),
364        };
365
366        let accessor = SqliteAccessor::new(core).with_normalized_root("/test/".to_string());
367
368        assert_eq!(accessor.root, "/test/");
369        assert_eq!(accessor.info.root(), "/test/".into());
370    }
371}