opendal/services/surrealdb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use surrealdb::engine::any::Any;
23use surrealdb::opt::auth::Database;
24use surrealdb::Surreal;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::normalize_root;
29use crate::raw::Access;
30use crate::services::SurrealdbConfig;
31use crate::*;
32
33impl Configurator for SurrealdbConfig {
34    type Builder = SurrealdbBuilder;
35    fn into_builder(self) -> Self::Builder {
36        SurrealdbBuilder { config: self }
37    }
38}
39
40#[doc = include_str!("docs.md")]
41#[derive(Default)]
42pub struct SurrealdbBuilder {
43    config: SurrealdbConfig,
44}
45
46impl Debug for SurrealdbBuilder {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("SurrealdbBuilder")
49            .field("config", &self.config)
50            .finish()
51    }
52}
53
54impl SurrealdbBuilder {
55    /// Set the connection_string of the surrealdb service.
56    ///
57    /// This connection string is used to connect to the surrealdb service. There are url based formats:
58    ///
59    /// ## Url
60    ///
61    /// - `ws://ip:port`
62    /// - `wss://ip:port`
63    /// - `http://ip:port`
64    /// - `https://ip:port`
65    pub fn connection_string(mut self, connection_string: &str) -> Self {
66        if !connection_string.is_empty() {
67            self.config.connection_string = Some(connection_string.to_string());
68        }
69        self
70    }
71
72    /// set the working directory, all operations will be performed under it.
73    ///
74    /// default: "/"
75    pub fn root(mut self, root: &str) -> Self {
76        self.config.root = if root.is_empty() {
77            None
78        } else {
79            Some(root.to_string())
80        };
81
82        self
83    }
84
85    /// Set the table name of the surrealdb service for read/write.
86    pub fn table(mut self, table: &str) -> Self {
87        if !table.is_empty() {
88            self.config.table = Some(table.to_string());
89        }
90        self
91    }
92
93    /// Set the username of the surrealdb service for signin.
94    pub fn username(mut self, username: &str) -> Self {
95        if !username.is_empty() {
96            self.config.username = Some(username.to_string());
97        }
98        self
99    }
100
101    /// Set the password of the surrealdb service for signin.
102    pub fn password(mut self, password: &str) -> Self {
103        if !password.is_empty() {
104            self.config.password = Some(password.to_string());
105        }
106        self
107    }
108
109    /// Set the namespace of the surrealdb service for read/write.
110    pub fn namespace(mut self, namespace: &str) -> Self {
111        if !namespace.is_empty() {
112            self.config.namespace = Some(namespace.to_string());
113        }
114        self
115    }
116
117    /// Set the database of the surrealdb service for read/write.
118    pub fn database(mut self, database: &str) -> Self {
119        if !database.is_empty() {
120            self.config.database = Some(database.to_string());
121        }
122        self
123    }
124
125    /// Set the key field name of the surrealdb service for read/write.
126    ///
127    /// Default to `key` if not specified.
128    pub fn key_field(mut self, key_field: &str) -> Self {
129        if !key_field.is_empty() {
130            self.config.key_field = Some(key_field.to_string());
131        }
132        self
133    }
134
135    /// Set the value field name of the surrealdb service for read/write.
136    ///
137    /// Default to `value` if not specified.
138    pub fn value_field(mut self, value_field: &str) -> Self {
139        if !value_field.is_empty() {
140            self.config.value_field = Some(value_field.to_string());
141        }
142        self
143    }
144}
145
146impl Builder for SurrealdbBuilder {
147    const SCHEME: Scheme = Scheme::Surrealdb;
148    type Config = SurrealdbConfig;
149
150    fn build(self) -> Result<impl Access> {
151        let connection_string = match self.config.connection_string.clone() {
152            Some(v) => v,
153            None => {
154                return Err(
155                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
156                        .with_context("service", Scheme::Surrealdb),
157                )
158            }
159        };
160
161        let namespace = match self.config.namespace.clone() {
162            Some(v) => v,
163            None => {
164                return Err(Error::new(ErrorKind::ConfigInvalid, "namespace is empty")
165                    .with_context("service", Scheme::Surrealdb))
166            }
167        };
168        let database = match self.config.database.clone() {
169            Some(v) => v,
170            None => {
171                return Err(Error::new(ErrorKind::ConfigInvalid, "database is empty")
172                    .with_context("service", Scheme::Surrealdb))
173            }
174        };
175        let table = match self.config.table.clone() {
176            Some(v) => v,
177            None => {
178                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
179                    .with_context("service", Scheme::Surrealdb))
180            }
181        };
182
183        let username = self.config.username.clone().unwrap_or_default();
184        let password = self.config.password.clone().unwrap_or_default();
185        let key_field = self
186            .config
187            .key_field
188            .clone()
189            .unwrap_or_else(|| "key".to_string());
190        let value_field = self
191            .config
192            .value_field
193            .clone()
194            .unwrap_or_else(|| "value".to_string());
195        let root = normalize_root(
196            self.config
197                .root
198                .clone()
199                .unwrap_or_else(|| "/".to_string())
200                .as_str(),
201        );
202
203        Ok(SurrealdbBackend::new(Adapter {
204            db: OnceCell::new(),
205            connection_string,
206            username,
207            password,
208            namespace,
209            database,
210            table,
211            key_field,
212            value_field,
213        })
214        .with_normalized_root(root))
215    }
216}
217
218/// Backend for Surrealdb service
219pub type SurrealdbBackend = kv::Backend<Adapter>;
220
221#[derive(Clone)]
222pub struct Adapter {
223    db: OnceCell<Arc<Surreal<Any>>>,
224    connection_string: String,
225
226    username: String,
227    password: String,
228    namespace: String,
229    database: String,
230
231    table: String,
232    key_field: String,
233    value_field: String,
234}
235
236impl Debug for Adapter {
237    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
238        f.debug_struct("Adapter")
239            .field("connection_string", &self.connection_string)
240            .field("username", &self.username)
241            .field("password", &"<redacted>")
242            .field("namespace", &self.namespace)
243            .field("database", &self.database)
244            .field("table", &self.table)
245            .field("key_field", &self.key_field)
246            .field("value_field", &self.value_field)
247            .finish()
248    }
249}
250
251impl Adapter {
252    async fn get_connection(&self) -> crate::Result<&Surreal<Any>> {
253        self.db
254            .get_or_try_init(|| async {
255                let namespace = self.namespace.as_str();
256                let database = self.database.as_str();
257
258                let db: Surreal<Any> = Surreal::init();
259                db.connect(self.connection_string.clone())
260                    .await
261                    .map_err(parse_surrealdb_error)?;
262
263                if !self.username.is_empty() && !self.password.is_empty() {
264                    db.signin(Database {
265                        namespace,
266                        database,
267                        username: self.username.as_str(),
268                        password: self.password.as_str(),
269                    })
270                    .await
271                    .map_err(parse_surrealdb_error)?;
272                }
273                db.use_ns(namespace)
274                    .use_db(database)
275                    .await
276                    .map_err(parse_surrealdb_error)?;
277
278                Ok(Arc::new(db))
279            })
280            .await
281            .map(|v| v.as_ref())
282    }
283}
284
285impl kv::Adapter for Adapter {
286    type Scanner = ();
287
288    fn info(&self) -> kv::Info {
289        kv::Info::new(
290            Scheme::Surrealdb,
291            &self.table,
292            Capability {
293                read: true,
294                write: true,
295                shared: true,
296                ..Default::default()
297            },
298        )
299    }
300
301    async fn get(&self, path: &str) -> crate::Result<Option<Buffer>> {
302        let query: String = if self.key_field == "id" {
303            "SELECT type::field($value_field) FROM type::thing($table, $path)".to_string()
304        } else {
305            format!("SELECT type::field($value_field) FROM type::table($table) WHERE {} = $path LIMIT 1", self.key_field)
306        };
307
308        let mut result = self
309            .get_connection()
310            .await?
311            .query(query)
312            .bind(("namespace", "opendal"))
313            .bind(("path", path.to_string()))
314            .bind(("table", self.table.to_string()))
315            .bind(("value_field", self.value_field.to_string()))
316            .await
317            .map_err(parse_surrealdb_error)?;
318
319        let value: Option<Vec<u8>> = result
320            .take((0, self.value_field.as_str()))
321            .map_err(parse_surrealdb_error)?;
322
323        Ok(value.map(Buffer::from))
324    }
325
326    async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
327        let query = format!(
328            "INSERT INTO {} ({}, {}) \
329            VALUES ($path, $value) \
330            ON DUPLICATE KEY UPDATE {} = $value",
331            self.table, self.key_field, self.value_field, self.value_field
332        );
333        self.get_connection()
334            .await?
335            .query(query)
336            .bind(("path", path.to_string()))
337            .bind(("value", value.to_vec()))
338            .await
339            .map_err(parse_surrealdb_error)?;
340        Ok(())
341    }
342
343    async fn delete(&self, path: &str) -> crate::Result<()> {
344        let query: String = if self.key_field == "id" {
345            "DELETE FROM type::thing($table, $path)".to_string()
346        } else {
347            format!(
348                "DELETE FROM type::table($table) WHERE {} = $path",
349                self.key_field
350            )
351        };
352
353        self.get_connection()
354            .await?
355            .query(query.as_str())
356            .bind(("path", path.to_string()))
357            .bind(("table", self.table.to_string()))
358            .await
359            .map_err(parse_surrealdb_error)?;
360        Ok(())
361    }
362}
363
364fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
365    Error::new(ErrorKind::Unexpected, "unhandled error from surrealdb").set_source(err)
366}