opendal/services/nebula_graph/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20#[cfg(feature = "tests")]
21use std::time::Duration;
22use std::vec;
23
24use base64::engine::general_purpose::STANDARD as BASE64;
25use base64::engine::Engine as _;
26use bb8::{PooledConnection, RunError};
27use rust_nebula::{
28    graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager,
29};
30use snowflaked::sync::Generator;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::NebulaGraphConfig;
36use crate::*;
37
38static GENERATOR: Generator = Generator::new(0);
39
40impl Configurator for NebulaGraphConfig {
41    type Builder = NebulaGraphBuilder;
42    fn into_builder(self) -> Self::Builder {
43        NebulaGraphBuilder { config: self }
44    }
45}
46
47#[doc = include_str!("docs.md")]
48#[derive(Default)]
49pub struct NebulaGraphBuilder {
50    config: NebulaGraphConfig,
51}
52
53impl Debug for NebulaGraphBuilder {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        let mut d = f.debug_struct("MysqlBuilder");
56
57        d.field("config", &self.config).finish()
58    }
59}
60
61impl NebulaGraphBuilder {
62    /// Set the host addr of nebulagraph's graphd server
63    pub fn host(&mut self, host: &str) -> &mut Self {
64        if !host.is_empty() {
65            self.config.host = Some(host.to_string());
66        }
67        self
68    }
69
70    /// Set the host port of nebulagraph's graphd server
71    pub fn port(&mut self, port: u16) -> &mut Self {
72        self.config.port = Some(port);
73        self
74    }
75
76    /// Set the username of nebulagraph's graphd server
77    pub fn username(&mut self, username: &str) -> &mut Self {
78        if !username.is_empty() {
79            self.config.username = Some(username.to_string());
80        }
81        self
82    }
83
84    /// Set the password of nebulagraph's graphd server
85    pub fn password(&mut self, password: &str) -> &mut Self {
86        if !password.is_empty() {
87            self.config.password = Some(password.to_string());
88        }
89        self
90    }
91
92    /// Set the space name of nebulagraph's graphd server
93    pub fn space(&mut self, space: &str) -> &mut Self {
94        if !space.is_empty() {
95            self.config.space = Some(space.to_string());
96        }
97        self
98    }
99
100    /// Set the tag name of nebulagraph's graphd server
101    pub fn tag(&mut self, tag: &str) -> &mut Self {
102        if !tag.is_empty() {
103            self.config.tag = Some(tag.to_string());
104        }
105        self
106    }
107
108    /// Set the key field name of the NebulaGraph service to read/write.
109    ///
110    /// Default to `key` if not specified.
111    pub fn key_field(&mut self, key_field: &str) -> &mut Self {
112        if !key_field.is_empty() {
113            self.config.key_field = Some(key_field.to_string());
114        }
115        self
116    }
117
118    /// Set the value field name of the NebulaGraph service to read/write.
119    ///
120    /// Default to `value` if not specified.
121    pub fn value_field(&mut self, value_field: &str) -> &mut Self {
122        if !value_field.is_empty() {
123            self.config.value_field = Some(value_field.to_string());
124        }
125        self
126    }
127
128    /// set the working directory, all operations will be performed under it.
129    ///
130    /// default: "/"
131    pub fn root(&mut self, root: &str) -> &mut Self {
132        if !root.is_empty() {
133            self.config.root = Some(root.to_string());
134        }
135        self
136    }
137}
138
139impl Builder for NebulaGraphBuilder {
140    const SCHEME: Scheme = Scheme::NebulaGraph;
141    type Config = NebulaGraphConfig;
142
143    fn build(self) -> Result<impl Access> {
144        let host = match self.config.host.clone() {
145            Some(v) => v,
146            None => {
147                return Err(Error::new(ErrorKind::ConfigInvalid, "host is empty")
148                    .with_context("service", Scheme::NebulaGraph))
149            }
150        };
151        let port = match self.config.port {
152            Some(v) => v,
153            None => {
154                return Err(Error::new(ErrorKind::ConfigInvalid, "port is empty")
155                    .with_context("service", Scheme::NebulaGraph))
156            }
157        };
158        let username = match self.config.username.clone() {
159            Some(v) => v,
160            None => {
161                return Err(Error::new(ErrorKind::ConfigInvalid, "username is empty")
162                    .with_context("service", Scheme::NebulaGraph))
163            }
164        };
165        let password = match self.config.password.clone() {
166            Some(v) => v,
167            None => "".to_string(),
168        };
169        let space = match self.config.space.clone() {
170            Some(v) => v,
171            None => {
172                return Err(Error::new(ErrorKind::ConfigInvalid, "space is empty")
173                    .with_context("service", Scheme::NebulaGraph))
174            }
175        };
176        let tag = match self.config.tag.clone() {
177            Some(v) => v,
178            None => {
179                return Err(Error::new(ErrorKind::ConfigInvalid, "tag is empty")
180                    .with_context("service", Scheme::NebulaGraph))
181            }
182        };
183        let key_field = match self.config.key_field.clone() {
184            Some(v) => v,
185            None => "key".to_string(),
186        };
187        let value_field = match self.config.value_field.clone() {
188            Some(v) => v,
189            None => "value".to_string(),
190        };
191        let root = normalize_root(
192            self.config
193                .root
194                .clone()
195                .unwrap_or_else(|| "/".to_string())
196                .as_str(),
197        );
198
199        let mut session_config = SingleConnSessionConf::new(
200            vec![HostAddress::new(&host, port)],
201            username,
202            password,
203            Some(space),
204        );
205        // NebulaGraph use fbthrift for communication. fbthrift's max_buffer_size is default 4 KB,
206        // which is too small to store something.
207        // So we could set max_buffer_size to 10 MB so that NebulaGraph can store files with filesize < 1 MB at least.
208        session_config.set_buf_size(1024 * 1024);
209        session_config.set_max_buf_size(64 * 1024 * 1024);
210        session_config.set_max_parse_response_bytes_count(254);
211
212        Ok(NebulaGraphBackend::new(Adapter {
213            session_pool: OnceCell::new(),
214            session_config,
215
216            tag,
217            key_field,
218            value_field,
219        })
220        .with_root(root.as_str()))
221    }
222}
223
224/// Backend for NebulaGraph service
225pub type NebulaGraphBackend = kv::Backend<Adapter>;
226
227#[derive(Clone)]
228pub struct Adapter {
229    session_pool: OnceCell<bb8::Pool<SingleConnSessionManager>>,
230    session_config: SingleConnSessionConf,
231
232    tag: String,
233    key_field: String,
234    value_field: String,
235}
236
237impl Debug for Adapter {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("Adapter")
240            .field("session_config", &self.session_config)
241            .field("tag", &self.tag)
242            .field("key_field", &self.key_field)
243            .field("value_field", &self.value_field)
244            .finish()
245    }
246}
247
248impl Adapter {
249    async fn get_session(&self) -> Result<PooledConnection<SingleConnSessionManager>> {
250        let session_pool = self
251            .session_pool
252            .get_or_try_init(|| async {
253                bb8::Pool::builder()
254                    .max_size(64)
255                    .build(SingleConnSessionManager::new(self.session_config.clone()))
256                    .await
257            })
258            .await
259            .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary())?;
260
261        session_pool.get().await.map_err(|err| match err {
262            RunError::User(err) => {
263                Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary()
264            }
265            RunError::TimedOut => {
266                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
267            }
268        })
269    }
270}
271
272impl kv::Adapter for Adapter {
273    type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
274
275    fn info(&self) -> kv::Info {
276        kv::Info::new(
277            Scheme::NebulaGraph,
278            &self.session_config.space.clone().unwrap(),
279            Capability {
280                read: true,
281                write: true,
282                write_total_max_size: Some(1024 * 1024),
283                write_can_empty: true,
284                delete: true,
285                list: true,
286                shared: true,
287                ..Default::default()
288            },
289        )
290    }
291
292    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
293        let path = path.replace("'", "\\'").replace('"', "\\\"");
294        let query = format!(
295            "LOOKUP ON {} WHERE {}.{} == '{}' YIELD properties(vertex).{} AS {};",
296            self.tag, self.tag, self.key_field, path, self.value_field, self.value_field
297        );
298        let mut sess = self.get_session().await?;
299        let result = sess
300            .query(&query)
301            .await
302            .map_err(parse_nebulagraph_session_error)?;
303        if result.is_empty() {
304            Ok(None)
305        } else {
306            let row = result
307                .get_row_values_by_index(0)
308                .map_err(parse_nebulagraph_dataset_error)?;
309            let value = row
310                .get_value_by_col_name(&self.value_field)
311                .map_err(parse_nebulagraph_dataset_error)?;
312            let base64_str = value.as_string().map_err(parse_nebulagraph_dataset_error)?;
313            let value_str = BASE64.decode(base64_str).map_err(|err| {
314                Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph")
315                    .set_source(err)
316            })?;
317            let buf = Buffer::from(value_str);
318            Ok(Some(buf))
319        }
320    }
321
322    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
323        #[cfg(feature = "tests")]
324        let path_copy = path;
325
326        self.delete(path).await?;
327        let path = path.replace("'", "\\'").replace('"', "\\\"");
328        let file = value.to_vec();
329        let file = BASE64.encode(&file);
330        let snowflake_id: u64 = GENERATOR.generate();
331        let query = format!(
332            "INSERT VERTEX {} VALUES {}:('{}', '{}');",
333            self.tag, snowflake_id, path, file
334        );
335        let mut sess = self.get_session().await?;
336        sess.execute(&query)
337            .await
338            .map_err(parse_nebulagraph_session_error)?;
339
340        // To pass tests, we should confirm NebulaGraph has inserted data successfully
341        #[cfg(feature = "tests")]
342        loop {
343            let v = self.get(path_copy).await.unwrap();
344            if v.is_none() {
345                std::thread::sleep(Duration::from_millis(1000));
346            } else {
347                break;
348            }
349        }
350        Ok(())
351    }
352
353    async fn delete(&self, path: &str) -> Result<()> {
354        let path = path.replace("'", "\\'").replace('"', "\\\"");
355        let query = format!(
356            "LOOKUP ON {} WHERE {}.{} == '{}' YIELD id(vertex) AS id | DELETE VERTEX $-.id;",
357            self.tag, self.tag, self.key_field, path
358        );
359        let mut sess = self.get_session().await?;
360        sess.execute(&query)
361            .await
362            .map_err(parse_nebulagraph_session_error)?;
363        Ok(())
364    }
365
366    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
367        let path = path.replace("'", "\\'").replace('"', "\\\"");
368        let query = format!(
369            "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};",
370            self.tag, self.tag, self.key_field, path, self.key_field, self.key_field
371        );
372
373        let mut sess = self.get_session().await?;
374        let result = sess
375            .query(&query)
376            .await
377            .map_err(parse_nebulagraph_session_error)?;
378        let mut res_vec = vec![];
379        for row_i in 0..result.get_row_size() {
380            let row = result
381                .get_row_values_by_index(row_i)
382                .map_err(parse_nebulagraph_dataset_error)?;
383            let value = row
384                .get_value_by_col_name(&self.key_field)
385                .map_err(parse_nebulagraph_dataset_error)?;
386            let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?;
387
388            res_vec.push(Ok(sub_path));
389        }
390        Ok(kv::ScanStdIter::new(res_vec.into_iter()))
391    }
392}
393
394fn parse_nebulagraph_session_error(err: rust_nebula::SingleConnSessionError) -> Error {
395    Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err)
396}
397
398fn parse_nebulagraph_dataset_error(err: rust_nebula::DataSetError) -> Error {
399    Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err)
400}