opendal/services/surrealdb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use surrealdb::engine::any::Any;
23use surrealdb::opt::auth::Database;
24use surrealdb::Surreal;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::normalize_root;
29use crate::raw::Access;
30use crate::services::SurrealdbConfig;
31use crate::*;
32
33impl Configurator for SurrealdbConfig {
34 type Builder = SurrealdbBuilder;
35 fn into_builder(self) -> Self::Builder {
36 SurrealdbBuilder { config: self }
37 }
38}
39
40#[doc = include_str!("docs.md")]
41#[derive(Default)]
42pub struct SurrealdbBuilder {
43 config: SurrealdbConfig,
44}
45
46impl Debug for SurrealdbBuilder {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("SurrealdbBuilder")
49 .field("config", &self.config)
50 .finish()
51 }
52}
53
54impl SurrealdbBuilder {
55 pub fn connection_string(mut self, connection_string: &str) -> Self {
66 if !connection_string.is_empty() {
67 self.config.connection_string = Some(connection_string.to_string());
68 }
69 self
70 }
71
72 pub fn root(mut self, root: &str) -> Self {
76 self.config.root = if root.is_empty() {
77 None
78 } else {
79 Some(root.to_string())
80 };
81
82 self
83 }
84
85 pub fn table(mut self, table: &str) -> Self {
87 if !table.is_empty() {
88 self.config.table = Some(table.to_string());
89 }
90 self
91 }
92
93 pub fn username(mut self, username: &str) -> Self {
95 if !username.is_empty() {
96 self.config.username = Some(username.to_string());
97 }
98 self
99 }
100
101 pub fn password(mut self, password: &str) -> Self {
103 if !password.is_empty() {
104 self.config.password = Some(password.to_string());
105 }
106 self
107 }
108
109 pub fn namespace(mut self, namespace: &str) -> Self {
111 if !namespace.is_empty() {
112 self.config.namespace = Some(namespace.to_string());
113 }
114 self
115 }
116
117 pub fn database(mut self, database: &str) -> Self {
119 if !database.is_empty() {
120 self.config.database = Some(database.to_string());
121 }
122 self
123 }
124
125 pub fn key_field(mut self, key_field: &str) -> Self {
129 if !key_field.is_empty() {
130 self.config.key_field = Some(key_field.to_string());
131 }
132 self
133 }
134
135 pub fn value_field(mut self, value_field: &str) -> Self {
139 if !value_field.is_empty() {
140 self.config.value_field = Some(value_field.to_string());
141 }
142 self
143 }
144}
145
146impl Builder for SurrealdbBuilder {
147 const SCHEME: Scheme = Scheme::Surrealdb;
148 type Config = SurrealdbConfig;
149
150 fn build(self) -> Result<impl Access> {
151 let connection_string = match self.config.connection_string.clone() {
152 Some(v) => v,
153 None => {
154 return Err(
155 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
156 .with_context("service", Scheme::Surrealdb),
157 )
158 }
159 };
160
161 let namespace = match self.config.namespace.clone() {
162 Some(v) => v,
163 None => {
164 return Err(Error::new(ErrorKind::ConfigInvalid, "namespace is empty")
165 .with_context("service", Scheme::Surrealdb))
166 }
167 };
168 let database = match self.config.database.clone() {
169 Some(v) => v,
170 None => {
171 return Err(Error::new(ErrorKind::ConfigInvalid, "database is empty")
172 .with_context("service", Scheme::Surrealdb))
173 }
174 };
175 let table = match self.config.table.clone() {
176 Some(v) => v,
177 None => {
178 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
179 .with_context("service", Scheme::Surrealdb))
180 }
181 };
182
183 let username = self.config.username.clone().unwrap_or_default();
184 let password = self.config.password.clone().unwrap_or_default();
185 let key_field = self
186 .config
187 .key_field
188 .clone()
189 .unwrap_or_else(|| "key".to_string());
190 let value_field = self
191 .config
192 .value_field
193 .clone()
194 .unwrap_or_else(|| "value".to_string());
195 let root = normalize_root(
196 self.config
197 .root
198 .clone()
199 .unwrap_or_else(|| "/".to_string())
200 .as_str(),
201 );
202
203 Ok(SurrealdbBackend::new(Adapter {
204 db: OnceCell::new(),
205 connection_string,
206 username,
207 password,
208 namespace,
209 database,
210 table,
211 key_field,
212 value_field,
213 })
214 .with_normalized_root(root))
215 }
216}
217
218pub type SurrealdbBackend = kv::Backend<Adapter>;
220
221#[derive(Clone)]
222pub struct Adapter {
223 db: OnceCell<Arc<Surreal<Any>>>,
224 connection_string: String,
225
226 username: String,
227 password: String,
228 namespace: String,
229 database: String,
230
231 table: String,
232 key_field: String,
233 value_field: String,
234}
235
236impl Debug for Adapter {
237 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
238 f.debug_struct("Adapter")
239 .field("connection_string", &self.connection_string)
240 .field("username", &self.username)
241 .field("password", &"<redacted>")
242 .field("namespace", &self.namespace)
243 .field("database", &self.database)
244 .field("table", &self.table)
245 .field("key_field", &self.key_field)
246 .field("value_field", &self.value_field)
247 .finish()
248 }
249}
250
251impl Adapter {
252 async fn get_connection(&self) -> crate::Result<&Surreal<Any>> {
253 self.db
254 .get_or_try_init(|| async {
255 let namespace = self.namespace.as_str();
256 let database = self.database.as_str();
257
258 let db: Surreal<Any> = Surreal::init();
259 db.connect(self.connection_string.clone())
260 .await
261 .map_err(parse_surrealdb_error)?;
262
263 if !self.username.is_empty() && !self.password.is_empty() {
264 db.signin(Database {
265 namespace,
266 database,
267 username: self.username.as_str(),
268 password: self.password.as_str(),
269 })
270 .await
271 .map_err(parse_surrealdb_error)?;
272 }
273 db.use_ns(namespace)
274 .use_db(database)
275 .await
276 .map_err(parse_surrealdb_error)?;
277
278 Ok(Arc::new(db))
279 })
280 .await
281 .map(|v| v.as_ref())
282 }
283}
284
285impl kv::Adapter for Adapter {
286 type Scanner = ();
287
288 fn info(&self) -> kv::Info {
289 kv::Info::new(
290 Scheme::Surrealdb,
291 &self.table,
292 Capability {
293 read: true,
294 write: true,
295 shared: true,
296 ..Default::default()
297 },
298 )
299 }
300
301 async fn get(&self, path: &str) -> crate::Result<Option<Buffer>> {
302 let query: String = if self.key_field == "id" {
303 "SELECT type::field($value_field) FROM type::thing($table, $path)".to_string()
304 } else {
305 format!("SELECT type::field($value_field) FROM type::table($table) WHERE {} = $path LIMIT 1", self.key_field)
306 };
307
308 let mut result = self
309 .get_connection()
310 .await?
311 .query(query)
312 .bind(("namespace", "opendal"))
313 .bind(("path", path.to_string()))
314 .bind(("table", self.table.to_string()))
315 .bind(("value_field", self.value_field.to_string()))
316 .await
317 .map_err(parse_surrealdb_error)?;
318
319 let value: Option<Vec<u8>> = result
320 .take((0, self.value_field.as_str()))
321 .map_err(parse_surrealdb_error)?;
322
323 Ok(value.map(Buffer::from))
324 }
325
326 async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
327 let query = format!(
328 "INSERT INTO {} ({}, {}) \
329 VALUES ($path, $value) \
330 ON DUPLICATE KEY UPDATE {} = $value",
331 self.table, self.key_field, self.value_field, self.value_field
332 );
333 self.get_connection()
334 .await?
335 .query(query)
336 .bind(("path", path.to_string()))
337 .bind(("value", value.to_vec()))
338 .await
339 .map_err(parse_surrealdb_error)?;
340 Ok(())
341 }
342
343 async fn delete(&self, path: &str) -> crate::Result<()> {
344 let query: String = if self.key_field == "id" {
345 "DELETE FROM type::thing($table, $path)".to_string()
346 } else {
347 format!(
348 "DELETE FROM type::table($table) WHERE {} = $path",
349 self.key_field
350 )
351 };
352
353 self.get_connection()
354 .await?
355 .query(query.as_str())
356 .bind(("path", path.to_string()))
357 .bind(("table", self.table.to_string()))
358 .await
359 .map_err(parse_surrealdb_error)?;
360 Ok(())
361 }
362}
363
364fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
365 Error::new(ErrorKind::Unexpected, "unhandled error from surrealdb").set_source(err)
366}