opendal/services/tikv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use tikv_client::Config;
22use tikv_client::RawClient;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::Access;
27use crate::services::TikvConfig;
28use crate::Builder;
29use crate::Capability;
30use crate::Error;
31use crate::ErrorKind;
32use crate::Scheme;
33use crate::*;
34
35impl Configurator for TikvConfig {
36 type Builder = TikvBuilder;
37 fn into_builder(self) -> Self::Builder {
38 TikvBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
44#[derive(Clone, Default)]
45pub struct TikvBuilder {
46 config: TikvConfig,
47}
48
49impl Debug for TikvBuilder {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 let mut d = f.debug_struct("TikvBuilder");
52
53 d.field("config", &self.config);
54 d.finish_non_exhaustive()
55 }
56}
57
58impl TikvBuilder {
59 pub fn endpoints(mut self, endpoints: Vec<String>) -> Self {
61 if !endpoints.is_empty() {
62 self.config.endpoints = Some(endpoints)
63 }
64 self
65 }
66
67 pub fn insecure(mut self) -> Self {
69 self.config.insecure = true;
70 self
71 }
72
73 pub fn ca_path(mut self, ca_path: &str) -> Self {
75 if !ca_path.is_empty() {
76 self.config.ca_path = Some(ca_path.to_string())
77 }
78 self
79 }
80
81 pub fn cert_path(mut self, cert_path: &str) -> Self {
83 if !cert_path.is_empty() {
84 self.config.cert_path = Some(cert_path.to_string())
85 }
86 self
87 }
88
89 pub fn key_path(mut self, key_path: &str) -> Self {
91 if !key_path.is_empty() {
92 self.config.key_path = Some(key_path.to_string())
93 }
94 self
95 }
96}
97
98impl Builder for TikvBuilder {
99 type Config = TikvConfig;
100
101 fn build(self) -> Result<impl Access> {
102 let endpoints = self.config.endpoints.ok_or_else(|| {
103 Error::new(
104 ErrorKind::ConfigInvalid,
105 "endpoints is required but not set",
106 )
107 .with_context("service", Scheme::Tikv)
108 })?;
109
110 if self.config.insecure
111 && (self.config.ca_path.is_some()
112 || self.config.key_path.is_some()
113 || self.config.cert_path.is_some())
114 {
115 return Err(
116 Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration")
117 .with_context("service", Scheme::Tikv)
118 .with_context("endpoints", format!("{endpoints:?}")),
119 )?;
120 }
121
122 Ok(TikvBackend::new(Adapter {
123 client: OnceCell::new(),
124 endpoints,
125 insecure: self.config.insecure,
126 ca_path: self.config.ca_path.clone(),
127 cert_path: self.config.cert_path.clone(),
128 key_path: self.config.key_path.clone(),
129 }))
130 }
131}
132
133pub type TikvBackend = kv::Backend<Adapter>;
135
136#[derive(Clone)]
137pub struct Adapter {
138 client: OnceCell<RawClient>,
139 endpoints: Vec<String>,
140 insecure: bool,
141 ca_path: Option<String>,
142 cert_path: Option<String>,
143 key_path: Option<String>,
144}
145
146impl Debug for Adapter {
147 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148 let mut ds = f.debug_struct("Adapter");
149
150 ds.field("endpoints", &self.endpoints);
151 ds.finish()
152 }
153}
154
155impl Adapter {
156 async fn get_connection(&self) -> Result<RawClient> {
157 if let Some(client) = self.client.get() {
158 return Ok(client.clone());
159 }
160 let client = if self.insecure {
161 RawClient::new(self.endpoints.clone())
162 .await
163 .map_err(parse_tikv_config_error)?
164 } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() {
165 let (ca_path, key_path, cert_path) = (
166 self.ca_path.clone().unwrap(),
167 self.key_path.clone().unwrap(),
168 self.cert_path.clone().unwrap(),
169 );
170 let config = Config::default().with_security(ca_path, cert_path, key_path);
171 RawClient::new_with_config(self.endpoints.clone(), config)
172 .await
173 .map_err(parse_tikv_config_error)?
174 } else {
175 return Err(
176 Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
177 .with_context("service", Scheme::Tikv)
178 .with_context("endpoints", format!("{:?}", self.endpoints)),
179 );
180 };
181 self.client.set(client.clone()).ok();
182 Ok(client)
183 }
184}
185
186impl kv::Adapter for Adapter {
187 type Scanner = ();
188
189 fn info(&self) -> kv::Info {
190 kv::Info::new(
191 Scheme::Tikv,
192 "TiKV",
193 Capability {
194 read: true,
195 write: true,
196 shared: true,
197 ..Default::default()
198 },
199 )
200 }
201
202 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
203 let result = self
204 .get_connection()
205 .await?
206 .get(path.to_owned())
207 .await
208 .map_err(parse_tikv_error)?;
209 Ok(result.map(Buffer::from))
210 }
211
212 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
213 self.get_connection()
214 .await?
215 .put(path.to_owned(), value.to_vec())
216 .await
217 .map_err(parse_tikv_error)
218 }
219
220 async fn delete(&self, path: &str) -> Result<()> {
221 self.get_connection()
222 .await?
223 .delete(path.to_owned())
224 .await
225 .map_err(parse_tikv_error)
226 }
227}
228
229fn parse_tikv_error(e: tikv_client::Error) -> Error {
230 Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
231}
232
233fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
234 Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
235 .with_context("service", Scheme::Tikv)
236 .set_source(e)
237}