opendal/services/surrealdb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use surrealdb::engine::any::Any;
23use surrealdb::opt::auth::Database;
24use surrealdb::Surreal;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::normalize_root;
29use crate::raw::Access;
30use crate::services::SurrealdbConfig;
31use crate::*;
32
33impl Configurator for SurrealdbConfig {
34    type Builder = SurrealdbBuilder;
35    fn into_builder(self) -> Self::Builder {
36        SurrealdbBuilder { config: self }
37    }
38}
39
40#[doc = include_str!("docs.md")]
41#[derive(Default)]
42pub struct SurrealdbBuilder {
43    config: SurrealdbConfig,
44}
45
46impl Debug for SurrealdbBuilder {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("SurrealdbBuilder")
49            .field("config", &self.config)
50            .finish()
51    }
52}
53
54impl SurrealdbBuilder {
55    /// Set the connection_string of the surrealdb service.
56    ///
57    /// This connection string is used to connect to the surrealdb service. There are url based formats:
58    ///
59    /// ## Url
60    ///
61    /// - `ws://ip:port`
62    /// - `wss://ip:port`
63    /// - `http://ip:port`
64    /// - `https://ip:port`
65    pub fn connection_string(mut self, connection_string: &str) -> Self {
66        if !connection_string.is_empty() {
67            self.config.connection_string = Some(connection_string.to_string());
68        }
69        self
70    }
71
72    /// set the working directory, all operations will be performed under it.
73    ///
74    /// default: "/"
75    pub fn root(mut self, root: &str) -> Self {
76        self.config.root = if root.is_empty() {
77            None
78        } else {
79            Some(root.to_string())
80        };
81
82        self
83    }
84
85    /// Set the table name of the surrealdb service for read/write.
86    pub fn table(mut self, table: &str) -> Self {
87        if !table.is_empty() {
88            self.config.table = Some(table.to_string());
89        }
90        self
91    }
92
93    /// Set the username of the surrealdb service for signin.
94    pub fn username(mut self, username: &str) -> Self {
95        if !username.is_empty() {
96            self.config.username = Some(username.to_string());
97        }
98        self
99    }
100
101    /// Set the password of the surrealdb service for signin.
102    pub fn password(mut self, password: &str) -> Self {
103        if !password.is_empty() {
104            self.config.password = Some(password.to_string());
105        }
106        self
107    }
108
109    /// Set the namespace of the surrealdb service for read/write.
110    pub fn namespace(mut self, namespace: &str) -> Self {
111        if !namespace.is_empty() {
112            self.config.namespace = Some(namespace.to_string());
113        }
114        self
115    }
116
117    /// Set the database of the surrealdb service for read/write.
118    pub fn database(mut self, database: &str) -> Self {
119        if !database.is_empty() {
120            self.config.database = Some(database.to_string());
121        }
122        self
123    }
124
125    /// Set the key field name of the surrealdb service for read/write.
126    ///
127    /// Default to `key` if not specified.
128    pub fn key_field(mut self, key_field: &str) -> Self {
129        if !key_field.is_empty() {
130            self.config.key_field = Some(key_field.to_string());
131        }
132        self
133    }
134
135    /// Set the value field name of the surrealdb service for read/write.
136    ///
137    /// Default to `value` if not specified.
138    pub fn value_field(mut self, value_field: &str) -> Self {
139        if !value_field.is_empty() {
140            self.config.value_field = Some(value_field.to_string());
141        }
142        self
143    }
144}
145
146impl Builder for SurrealdbBuilder {
147    type Config = SurrealdbConfig;
148
149    fn build(self) -> Result<impl Access> {
150        let connection_string = match self.config.connection_string.clone() {
151            Some(v) => v,
152            None => {
153                return Err(
154                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
155                        .with_context("service", Scheme::Surrealdb),
156                )
157            }
158        };
159
160        let namespace = match self.config.namespace.clone() {
161            Some(v) => v,
162            None => {
163                return Err(Error::new(ErrorKind::ConfigInvalid, "namespace is empty")
164                    .with_context("service", Scheme::Surrealdb))
165            }
166        };
167        let database = match self.config.database.clone() {
168            Some(v) => v,
169            None => {
170                return Err(Error::new(ErrorKind::ConfigInvalid, "database is empty")
171                    .with_context("service", Scheme::Surrealdb))
172            }
173        };
174        let table = match self.config.table.clone() {
175            Some(v) => v,
176            None => {
177                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
178                    .with_context("service", Scheme::Surrealdb))
179            }
180        };
181
182        let username = self.config.username.clone().unwrap_or_default();
183        let password = self.config.password.clone().unwrap_or_default();
184        let key_field = self
185            .config
186            .key_field
187            .clone()
188            .unwrap_or_else(|| "key".to_string());
189        let value_field = self
190            .config
191            .value_field
192            .clone()
193            .unwrap_or_else(|| "value".to_string());
194        let root = normalize_root(
195            self.config
196                .root
197                .clone()
198                .unwrap_or_else(|| "/".to_string())
199                .as_str(),
200        );
201
202        Ok(SurrealdbBackend::new(Adapter {
203            db: OnceCell::new(),
204            connection_string,
205            username,
206            password,
207            namespace,
208            database,
209            table,
210            key_field,
211            value_field,
212        })
213        .with_normalized_root(root))
214    }
215}
216
217/// Backend for Surrealdb service
218pub type SurrealdbBackend = kv::Backend<Adapter>;
219
220#[derive(Clone)]
221pub struct Adapter {
222    db: OnceCell<Arc<Surreal<Any>>>,
223    connection_string: String,
224
225    username: String,
226    password: String,
227    namespace: String,
228    database: String,
229
230    table: String,
231    key_field: String,
232    value_field: String,
233}
234
235impl Debug for Adapter {
236    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
237        f.debug_struct("Adapter")
238            .field("connection_string", &self.connection_string)
239            .field("username", &self.username)
240            .field("password", &"<redacted>")
241            .field("namespace", &self.namespace)
242            .field("database", &self.database)
243            .field("table", &self.table)
244            .field("key_field", &self.key_field)
245            .field("value_field", &self.value_field)
246            .finish()
247    }
248}
249
250impl Adapter {
251    async fn get_connection(&self) -> crate::Result<&Surreal<Any>> {
252        self.db
253            .get_or_try_init(|| async {
254                let namespace = self.namespace.as_str();
255                let database = self.database.as_str();
256
257                let db: Surreal<Any> = Surreal::init();
258                db.connect(self.connection_string.clone())
259                    .await
260                    .map_err(parse_surrealdb_error)?;
261
262                if !self.username.is_empty() && !self.password.is_empty() {
263                    db.signin(Database {
264                        namespace,
265                        database,
266                        username: self.username.as_str(),
267                        password: self.password.as_str(),
268                    })
269                    .await
270                    .map_err(parse_surrealdb_error)?;
271                }
272                db.use_ns(namespace)
273                    .use_db(database)
274                    .await
275                    .map_err(parse_surrealdb_error)?;
276
277                Ok(Arc::new(db))
278            })
279            .await
280            .map(|v| v.as_ref())
281    }
282}
283
284impl kv::Adapter for Adapter {
285    type Scanner = ();
286
287    fn info(&self) -> kv::Info {
288        kv::Info::new(
289            Scheme::Surrealdb,
290            &self.table,
291            Capability {
292                read: true,
293                write: true,
294                shared: true,
295                ..Default::default()
296            },
297        )
298    }
299
300    async fn get(&self, path: &str) -> crate::Result<Option<Buffer>> {
301        let query: String = if self.key_field == "id" {
302            "SELECT type::field($value_field) FROM type::thing($table, $path)".to_string()
303        } else {
304            format!("SELECT type::field($value_field) FROM type::table($table) WHERE {} = $path LIMIT 1", self.key_field)
305        };
306
307        let mut result = self
308            .get_connection()
309            .await?
310            .query(query)
311            .bind(("namespace", "opendal"))
312            .bind(("path", path.to_string()))
313            .bind(("table", self.table.to_string()))
314            .bind(("value_field", self.value_field.to_string()))
315            .await
316            .map_err(parse_surrealdb_error)?;
317
318        let value: Option<Vec<u8>> = result
319            .take((0, self.value_field.as_str()))
320            .map_err(parse_surrealdb_error)?;
321
322        Ok(value.map(Buffer::from))
323    }
324
325    async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
326        let query = format!(
327            "INSERT INTO {} ({}, {}) \
328            VALUES ($path, $value) \
329            ON DUPLICATE KEY UPDATE {} = $value",
330            self.table, self.key_field, self.value_field, self.value_field
331        );
332        self.get_connection()
333            .await?
334            .query(query)
335            .bind(("path", path.to_string()))
336            .bind(("value", value.to_vec()))
337            .await
338            .map_err(parse_surrealdb_error)?;
339        Ok(())
340    }
341
342    async fn delete(&self, path: &str) -> crate::Result<()> {
343        let query: String = if self.key_field == "id" {
344            "DELETE FROM type::thing($table, $path)".to_string()
345        } else {
346            format!(
347                "DELETE FROM type::table($table) WHERE {} = $path",
348                self.key_field
349            )
350        };
351
352        self.get_connection()
353            .await?
354            .query(query.as_str())
355            .bind(("path", path.to_string()))
356            .bind(("table", self.table.to_string()))
357            .await
358            .map_err(parse_surrealdb_error)?;
359        Ok(())
360    }
361}
362
363fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
364    Error::new(ErrorKind::Unexpected, "unhandled error from surrealdb").set_source(err)
365}