opendal/services/nebula_graph/
backend.rs1use std::fmt::Debug;
19
20#[cfg(feature = "tests")]
21use std::time::Duration;
22use std::vec;
23
24use base64::engine::general_purpose::STANDARD as BASE64;
25use base64::engine::Engine as _;
26use bb8::{PooledConnection, RunError};
27use rust_nebula::{
28 graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager,
29};
30use snowflaked::sync::Generator;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::NebulaGraphConfig;
36use crate::*;
37
38static GENERATOR: Generator = Generator::new(0);
39
40impl Configurator for NebulaGraphConfig {
41 type Builder = NebulaGraphBuilder;
42 fn into_builder(self) -> Self::Builder {
43 NebulaGraphBuilder { config: self }
44 }
45}
46
47#[doc = include_str!("docs.md")]
48#[derive(Default)]
49pub struct NebulaGraphBuilder {
50 config: NebulaGraphConfig,
51}
52
53impl Debug for NebulaGraphBuilder {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 let mut d = f.debug_struct("MysqlBuilder");
56
57 d.field("config", &self.config).finish()
58 }
59}
60
61impl NebulaGraphBuilder {
62 pub fn host(&mut self, host: &str) -> &mut Self {
64 if !host.is_empty() {
65 self.config.host = Some(host.to_string());
66 }
67 self
68 }
69
70 pub fn port(&mut self, port: u16) -> &mut Self {
72 self.config.port = Some(port);
73 self
74 }
75
76 pub fn username(&mut self, username: &str) -> &mut Self {
78 if !username.is_empty() {
79 self.config.username = Some(username.to_string());
80 }
81 self
82 }
83
84 pub fn password(&mut self, password: &str) -> &mut Self {
86 if !password.is_empty() {
87 self.config.password = Some(password.to_string());
88 }
89 self
90 }
91
92 pub fn space(&mut self, space: &str) -> &mut Self {
94 if !space.is_empty() {
95 self.config.space = Some(space.to_string());
96 }
97 self
98 }
99
100 pub fn tag(&mut self, tag: &str) -> &mut Self {
102 if !tag.is_empty() {
103 self.config.tag = Some(tag.to_string());
104 }
105 self
106 }
107
108 pub fn key_field(&mut self, key_field: &str) -> &mut Self {
112 if !key_field.is_empty() {
113 self.config.key_field = Some(key_field.to_string());
114 }
115 self
116 }
117
118 pub fn value_field(&mut self, value_field: &str) -> &mut Self {
122 if !value_field.is_empty() {
123 self.config.value_field = Some(value_field.to_string());
124 }
125 self
126 }
127
128 pub fn root(&mut self, root: &str) -> &mut Self {
132 if !root.is_empty() {
133 self.config.root = Some(root.to_string());
134 }
135 self
136 }
137}
138
139impl Builder for NebulaGraphBuilder {
140 const SCHEME: Scheme = Scheme::NebulaGraph;
141 type Config = NebulaGraphConfig;
142
143 fn build(self) -> Result<impl Access> {
144 let host = match self.config.host.clone() {
145 Some(v) => v,
146 None => {
147 return Err(Error::new(ErrorKind::ConfigInvalid, "host is empty")
148 .with_context("service", Scheme::NebulaGraph))
149 }
150 };
151 let port = match self.config.port {
152 Some(v) => v,
153 None => {
154 return Err(Error::new(ErrorKind::ConfigInvalid, "port is empty")
155 .with_context("service", Scheme::NebulaGraph))
156 }
157 };
158 let username = match self.config.username.clone() {
159 Some(v) => v,
160 None => {
161 return Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
162 .with_context("service", Scheme::NebulaGraph))
163 }
164 };
165 let password = match self.config.password.clone() {
166 Some(v) => v,
167 None => "".to_string(),
168 };
169 let space = match self.config.space.clone() {
170 Some(v) => v,
171 None => {
172 return Err(Error::new(ErrorKind::ConfigInvalid, "space is empty")
173 .with_context("service", Scheme::NebulaGraph))
174 }
175 };
176 let tag = match self.config.tag.clone() {
177 Some(v) => v,
178 None => {
179 return Err(Error::new(ErrorKind::ConfigInvalid, "tag is empty")
180 .with_context("service", Scheme::NebulaGraph))
181 }
182 };
183 let key_field = match self.config.key_field.clone() {
184 Some(v) => v,
185 None => "key".to_string(),
186 };
187 let value_field = match self.config.value_field.clone() {
188 Some(v) => v,
189 None => "value".to_string(),
190 };
191 let root = normalize_root(
192 self.config
193 .root
194 .clone()
195 .unwrap_or_else(|| "/".to_string())
196 .as_str(),
197 );
198
199 let mut session_config = SingleConnSessionConf::new(
200 vec![HostAddress::new(&host, port)],
201 username,
202 password,
203 Some(space),
204 );
205 session_config.set_buf_size(1024 * 1024);
209 session_config.set_max_buf_size(64 * 1024 * 1024);
210 session_config.set_max_parse_response_bytes_count(254);
211
212 Ok(NebulaGraphBackend::new(Adapter {
213 session_pool: OnceCell::new(),
214 session_config,
215
216 tag,
217 key_field,
218 value_field,
219 })
220 .with_root(root.as_str()))
221 }
222}
223
224pub type NebulaGraphBackend = kv::Backend<Adapter>;
226
227#[derive(Clone)]
228pub struct Adapter {
229 session_pool: OnceCell<bb8::Pool<SingleConnSessionManager>>,
230 session_config: SingleConnSessionConf,
231
232 tag: String,
233 key_field: String,
234 value_field: String,
235}
236
237impl Debug for Adapter {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("Adapter")
240 .field("session_config", &self.session_config)
241 .field("tag", &self.tag)
242 .field("key_field", &self.key_field)
243 .field("value_field", &self.value_field)
244 .finish()
245 }
246}
247
248impl Adapter {
249 async fn get_session(&self) -> Result<PooledConnection<SingleConnSessionManager>> {
250 let session_pool = self
251 .session_pool
252 .get_or_try_init(|| async {
253 bb8::Pool::builder()
254 .max_size(64)
255 .build(SingleConnSessionManager::new(self.session_config.clone()))
256 .await
257 })
258 .await
259 .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary())?;
260
261 session_pool.get().await.map_err(|err| match err {
262 RunError::User(err) => {
263 Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary()
264 }
265 RunError::TimedOut => {
266 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
267 }
268 })
269 }
270}
271
272impl kv::Adapter for Adapter {
273 type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
274
275 fn info(&self) -> kv::Info {
276 kv::Info::new(
277 Scheme::NebulaGraph,
278 &self.session_config.space.clone().unwrap(),
279 Capability {
280 read: true,
281 write: true,
282 write_total_max_size: Some(1024 * 1024),
283 write_can_empty: true,
284 delete: true,
285 list: true,
286 shared: true,
287 ..Default::default()
288 },
289 )
290 }
291
292 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
293 let path = path.replace("'", "\\'").replace('"', "\\\"");
294 let query = format!(
295 "LOOKUP ON {} WHERE {}.{} == '{}' YIELD properties(vertex).{} AS {};",
296 self.tag, self.tag, self.key_field, path, self.value_field, self.value_field
297 );
298 let mut sess = self.get_session().await?;
299 let result = sess
300 .query(&query)
301 .await
302 .map_err(parse_nebulagraph_session_error)?;
303 if result.is_empty() {
304 Ok(None)
305 } else {
306 let row = result
307 .get_row_values_by_index(0)
308 .map_err(parse_nebulagraph_dataset_error)?;
309 let value = row
310 .get_value_by_col_name(&self.value_field)
311 .map_err(parse_nebulagraph_dataset_error)?;
312 let base64_str = value.as_string().map_err(parse_nebulagraph_dataset_error)?;
313 let value_str = BASE64.decode(base64_str).map_err(|err| {
314 Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph")
315 .set_source(err)
316 })?;
317 let buf = Buffer::from(value_str);
318 Ok(Some(buf))
319 }
320 }
321
322 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
323 #[cfg(feature = "tests")]
324 let path_copy = path;
325
326 self.delete(path).await?;
327 let path = path.replace("'", "\\'").replace('"', "\\\"");
328 let file = value.to_vec();
329 let file = BASE64.encode(&file);
330 let snowflake_id: u64 = GENERATOR.generate();
331 let query = format!(
332 "INSERT VERTEX {} VALUES {}:('{}', '{}');",
333 self.tag, snowflake_id, path, file
334 );
335 let mut sess = self.get_session().await?;
336 sess.execute(&query)
337 .await
338 .map_err(parse_nebulagraph_session_error)?;
339
340 #[cfg(feature = "tests")]
342 loop {
343 let v = self.get(path_copy).await.unwrap();
344 if v.is_none() {
345 std::thread::sleep(Duration::from_millis(1000));
346 } else {
347 break;
348 }
349 }
350 Ok(())
351 }
352
353 async fn delete(&self, path: &str) -> Result<()> {
354 let path = path.replace("'", "\\'").replace('"', "\\\"");
355 let query = format!(
356 "LOOKUP ON {} WHERE {}.{} == '{}' YIELD id(vertex) AS id | DELETE VERTEX $-.id;",
357 self.tag, self.tag, self.key_field, path
358 );
359 let mut sess = self.get_session().await?;
360 sess.execute(&query)
361 .await
362 .map_err(parse_nebulagraph_session_error)?;
363 Ok(())
364 }
365
366 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
367 let path = path.replace("'", "\\'").replace('"', "\\\"");
368 let query = format!(
369 "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};",
370 self.tag, self.tag, self.key_field, path, self.key_field, self.key_field
371 );
372
373 let mut sess = self.get_session().await?;
374 let result = sess
375 .query(&query)
376 .await
377 .map_err(parse_nebulagraph_session_error)?;
378 let mut res_vec = vec![];
379 for row_i in 0..result.get_row_size() {
380 let row = result
381 .get_row_values_by_index(row_i)
382 .map_err(parse_nebulagraph_dataset_error)?;
383 let value = row
384 .get_value_by_col_name(&self.key_field)
385 .map_err(parse_nebulagraph_dataset_error)?;
386 let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?;
387
388 res_vec.push(Ok(sub_path));
389 }
390 Ok(kv::ScanStdIter::new(res_vec.into_iter()))
391 }
392}
393
394fn parse_nebulagraph_session_error(err: rust_nebula::SingleConnSessionError) -> Error {
395 Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err)
396}
397
398fn parse_nebulagraph_dataset_error(err: rust_nebula::DataSetError) -> Error {
399 Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err)
400}