opendal_core/services/tikv/
core.rs1use std::fmt::Debug;
19
20use mea::once::OnceCell;
21use tikv_client::Config;
22use tikv_client::RawClient;
23
24use super::TIKV_SCHEME;
25use crate::*;
26
27#[derive(Clone)]
29pub struct TikvCore {
30 pub client: OnceCell<RawClient>,
31 pub endpoints: Vec<String>,
32 pub insecure: bool,
33 pub ca_path: Option<String>,
34 pub cert_path: Option<String>,
35 pub key_path: Option<String>,
36}
37
38impl Debug for TikvCore {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("TikvCore")
41 .field("endpoints", &self.endpoints)
42 .field("insecure", &self.insecure)
43 .finish()
44 }
45}
46
47impl TikvCore {
48 async fn get_connection(&self) -> Result<&RawClient> {
49 self.client
50 .get_or_try_init(|| async {
51 if self.insecure {
52 return RawClient::new(self.endpoints.clone())
53 .await
54 .map_err(parse_tikv_config_error);
55 }
56
57 if let Some(ca_path) = self.ca_path.as_ref()
58 && let Some(key_path) = self.key_path.as_ref()
59 && let Some(cert_path) = self.cert_path.as_ref()
60 {
61 let config = Config::default().with_security(ca_path, cert_path, key_path);
62 return RawClient::new_with_config(self.endpoints.clone(), config)
63 .await
64 .map_err(parse_tikv_config_error);
65 }
66
67 Err(
68 Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
69 .with_context("service", TIKV_SCHEME)
70 .with_context("endpoints", format!("{:?}", self.endpoints)),
71 )
72 })
73 .await
74 }
75
76 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
77 let result = self
78 .get_connection()
79 .await?
80 .get(path.to_owned())
81 .await
82 .map_err(parse_tikv_error)?;
83 Ok(result.map(Buffer::from))
84 }
85
86 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
87 self.get_connection()
88 .await?
89 .put(path.to_owned(), value.to_vec())
90 .await
91 .map_err(parse_tikv_error)
92 }
93
94 pub async fn delete(&self, path: &str) -> Result<()> {
95 self.get_connection()
96 .await?
97 .delete(path.to_owned())
98 .await
99 .map_err(parse_tikv_error)
100 }
101}
102
103fn parse_tikv_error(e: tikv_client::Error) -> Error {
104 Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
105}
106
107fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
108 Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
109 .with_context("service", TIKV_SCHEME)
110 .set_source(e)
111}