opendal/services/surrealdb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use surrealdb::engine::any::Any;
23use surrealdb::opt::auth::Database;
24use surrealdb::Surreal;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::normalize_root;
29use crate::raw::Access;
30use crate::services::SurrealdbConfig;
31use crate::*;
32
33impl Configurator for SurrealdbConfig {
34 type Builder = SurrealdbBuilder;
35 fn into_builder(self) -> Self::Builder {
36 SurrealdbBuilder { config: self }
37 }
38}
39
40#[doc = include_str!("docs.md")]
41#[derive(Default)]
42pub struct SurrealdbBuilder {
43 config: SurrealdbConfig,
44}
45
46impl Debug for SurrealdbBuilder {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("SurrealdbBuilder")
49 .field("config", &self.config)
50 .finish()
51 }
52}
53
54impl SurrealdbBuilder {
55 pub fn connection_string(mut self, connection_string: &str) -> Self {
66 if !connection_string.is_empty() {
67 self.config.connection_string = Some(connection_string.to_string());
68 }
69 self
70 }
71
72 pub fn root(mut self, root: &str) -> Self {
76 self.config.root = if root.is_empty() {
77 None
78 } else {
79 Some(root.to_string())
80 };
81
82 self
83 }
84
85 pub fn table(mut self, table: &str) -> Self {
87 if !table.is_empty() {
88 self.config.table = Some(table.to_string());
89 }
90 self
91 }
92
93 pub fn username(mut self, username: &str) -> Self {
95 if !username.is_empty() {
96 self.config.username = Some(username.to_string());
97 }
98 self
99 }
100
101 pub fn password(mut self, password: &str) -> Self {
103 if !password.is_empty() {
104 self.config.password = Some(password.to_string());
105 }
106 self
107 }
108
109 pub fn namespace(mut self, namespace: &str) -> Self {
111 if !namespace.is_empty() {
112 self.config.namespace = Some(namespace.to_string());
113 }
114 self
115 }
116
117 pub fn database(mut self, database: &str) -> Self {
119 if !database.is_empty() {
120 self.config.database = Some(database.to_string());
121 }
122 self
123 }
124
125 pub fn key_field(mut self, key_field: &str) -> Self {
129 if !key_field.is_empty() {
130 self.config.key_field = Some(key_field.to_string());
131 }
132 self
133 }
134
135 pub fn value_field(mut self, value_field: &str) -> Self {
139 if !value_field.is_empty() {
140 self.config.value_field = Some(value_field.to_string());
141 }
142 self
143 }
144}
145
146impl Builder for SurrealdbBuilder {
147 type Config = SurrealdbConfig;
148
149 fn build(self) -> Result<impl Access> {
150 let connection_string = match self.config.connection_string.clone() {
151 Some(v) => v,
152 None => {
153 return Err(
154 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
155 .with_context("service", Scheme::Surrealdb),
156 )
157 }
158 };
159
160 let namespace = match self.config.namespace.clone() {
161 Some(v) => v,
162 None => {
163 return Err(Error::new(ErrorKind::ConfigInvalid, "namespace is empty")
164 .with_context("service", Scheme::Surrealdb))
165 }
166 };
167 let database = match self.config.database.clone() {
168 Some(v) => v,
169 None => {
170 return Err(Error::new(ErrorKind::ConfigInvalid, "database is empty")
171 .with_context("service", Scheme::Surrealdb))
172 }
173 };
174 let table = match self.config.table.clone() {
175 Some(v) => v,
176 None => {
177 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
178 .with_context("service", Scheme::Surrealdb))
179 }
180 };
181
182 let username = self.config.username.clone().unwrap_or_default();
183 let password = self.config.password.clone().unwrap_or_default();
184 let key_field = self
185 .config
186 .key_field
187 .clone()
188 .unwrap_or_else(|| "key".to_string());
189 let value_field = self
190 .config
191 .value_field
192 .clone()
193 .unwrap_or_else(|| "value".to_string());
194 let root = normalize_root(
195 self.config
196 .root
197 .clone()
198 .unwrap_or_else(|| "/".to_string())
199 .as_str(),
200 );
201
202 Ok(SurrealdbBackend::new(Adapter {
203 db: OnceCell::new(),
204 connection_string,
205 username,
206 password,
207 namespace,
208 database,
209 table,
210 key_field,
211 value_field,
212 })
213 .with_normalized_root(root))
214 }
215}
216
217pub type SurrealdbBackend = kv::Backend<Adapter>;
219
220#[derive(Clone)]
221pub struct Adapter {
222 db: OnceCell<Arc<Surreal<Any>>>,
223 connection_string: String,
224
225 username: String,
226 password: String,
227 namespace: String,
228 database: String,
229
230 table: String,
231 key_field: String,
232 value_field: String,
233}
234
235impl Debug for Adapter {
236 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
237 f.debug_struct("Adapter")
238 .field("connection_string", &self.connection_string)
239 .field("username", &self.username)
240 .field("password", &"<redacted>")
241 .field("namespace", &self.namespace)
242 .field("database", &self.database)
243 .field("table", &self.table)
244 .field("key_field", &self.key_field)
245 .field("value_field", &self.value_field)
246 .finish()
247 }
248}
249
250impl Adapter {
251 async fn get_connection(&self) -> crate::Result<&Surreal<Any>> {
252 self.db
253 .get_or_try_init(|| async {
254 let namespace = self.namespace.as_str();
255 let database = self.database.as_str();
256
257 let db: Surreal<Any> = Surreal::init();
258 db.connect(self.connection_string.clone())
259 .await
260 .map_err(parse_surrealdb_error)?;
261
262 if !self.username.is_empty() && !self.password.is_empty() {
263 db.signin(Database {
264 namespace,
265 database,
266 username: self.username.as_str(),
267 password: self.password.as_str(),
268 })
269 .await
270 .map_err(parse_surrealdb_error)?;
271 }
272 db.use_ns(namespace)
273 .use_db(database)
274 .await
275 .map_err(parse_surrealdb_error)?;
276
277 Ok(Arc::new(db))
278 })
279 .await
280 .map(|v| v.as_ref())
281 }
282}
283
284impl kv::Adapter for Adapter {
285 type Scanner = ();
286
287 fn info(&self) -> kv::Info {
288 kv::Info::new(
289 Scheme::Surrealdb,
290 &self.table,
291 Capability {
292 read: true,
293 write: true,
294 shared: true,
295 ..Default::default()
296 },
297 )
298 }
299
300 async fn get(&self, path: &str) -> crate::Result<Option<Buffer>> {
301 let query: String = if self.key_field == "id" {
302 "SELECT type::field($value_field) FROM type::thing($table, $path)".to_string()
303 } else {
304 format!("SELECT type::field($value_field) FROM type::table($table) WHERE {} = $path LIMIT 1", self.key_field)
305 };
306
307 let mut result = self
308 .get_connection()
309 .await?
310 .query(query)
311 .bind(("namespace", "opendal"))
312 .bind(("path", path.to_string()))
313 .bind(("table", self.table.to_string()))
314 .bind(("value_field", self.value_field.to_string()))
315 .await
316 .map_err(parse_surrealdb_error)?;
317
318 let value: Option<Vec<u8>> = result
319 .take((0, self.value_field.as_str()))
320 .map_err(parse_surrealdb_error)?;
321
322 Ok(value.map(Buffer::from))
323 }
324
325 async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
326 let query = format!(
327 "INSERT INTO {} ({}, {}) \
328 VALUES ($path, $value) \
329 ON DUPLICATE KEY UPDATE {} = $value",
330 self.table, self.key_field, self.value_field, self.value_field
331 );
332 self.get_connection()
333 .await?
334 .query(query)
335 .bind(("path", path.to_string()))
336 .bind(("value", value.to_vec()))
337 .await
338 .map_err(parse_surrealdb_error)?;
339 Ok(())
340 }
341
342 async fn delete(&self, path: &str) -> crate::Result<()> {
343 let query: String = if self.key_field == "id" {
344 "DELETE FROM type::thing($table, $path)".to_string()
345 } else {
346 format!(
347 "DELETE FROM type::table($table) WHERE {} = $path",
348 self.key_field
349 )
350 };
351
352 self.get_connection()
353 .await?
354 .query(query.as_str())
355 .bind(("path", path.to_string()))
356 .bind(("table", self.table.to_string()))
357 .await
358 .map_err(parse_surrealdb_error)?;
359 Ok(())
360 }
361}
362
363fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
364 Error::new(ErrorKind::Unexpected, "unhandled error from surrealdb").set_source(err)
365}