opendal/services/tikv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use tikv_client::Config;
22use tikv_client::RawClient;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::Access;
27use crate::services::TikvConfig;
28use crate::Builder;
29use crate::Capability;
30use crate::Error;
31use crate::ErrorKind;
32use crate::Scheme;
33use crate::*;
34
35impl Configurator for TikvConfig {
36 type Builder = TikvBuilder;
37 fn into_builder(self) -> Self::Builder {
38 TikvBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
44#[derive(Clone, Default)]
45pub struct TikvBuilder {
46 config: TikvConfig,
47}
48
49impl Debug for TikvBuilder {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 let mut d = f.debug_struct("TikvBuilder");
52
53 d.field("config", &self.config);
54 d.finish_non_exhaustive()
55 }
56}
57
58impl TikvBuilder {
59 pub fn endpoints(mut self, endpoints: Vec<String>) -> Self {
61 if !endpoints.is_empty() {
62 self.config.endpoints = Some(endpoints)
63 }
64 self
65 }
66
67 pub fn insecure(mut self) -> Self {
69 self.config.insecure = true;
70 self
71 }
72
73 pub fn ca_path(mut self, ca_path: &str) -> Self {
75 if !ca_path.is_empty() {
76 self.config.ca_path = Some(ca_path.to_string())
77 }
78 self
79 }
80
81 pub fn cert_path(mut self, cert_path: &str) -> Self {
83 if !cert_path.is_empty() {
84 self.config.cert_path = Some(cert_path.to_string())
85 }
86 self
87 }
88
89 pub fn key_path(mut self, key_path: &str) -> Self {
91 if !key_path.is_empty() {
92 self.config.key_path = Some(key_path.to_string())
93 }
94 self
95 }
96}
97
98impl Builder for TikvBuilder {
99 const SCHEME: Scheme = Scheme::Tikv;
100 type Config = TikvConfig;
101
102 fn build(self) -> Result<impl Access> {
103 let endpoints = self.config.endpoints.ok_or_else(|| {
104 Error::new(
105 ErrorKind::ConfigInvalid,
106 "endpoints is required but not set",
107 )
108 .with_context("service", Scheme::Tikv)
109 })?;
110
111 if self.config.insecure
112 && (self.config.ca_path.is_some()
113 || self.config.key_path.is_some()
114 || self.config.cert_path.is_some())
115 {
116 return Err(
117 Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration")
118 .with_context("service", Scheme::Tikv)
119 .with_context("endpoints", format!("{:?}", endpoints)),
120 )?;
121 }
122
123 Ok(TikvBackend::new(Adapter {
124 client: OnceCell::new(),
125 endpoints,
126 insecure: self.config.insecure,
127 ca_path: self.config.ca_path.clone(),
128 cert_path: self.config.cert_path.clone(),
129 key_path: self.config.key_path.clone(),
130 }))
131 }
132}
133
134pub type TikvBackend = kv::Backend<Adapter>;
136
137#[derive(Clone)]
138pub struct Adapter {
139 client: OnceCell<RawClient>,
140 endpoints: Vec<String>,
141 insecure: bool,
142 ca_path: Option<String>,
143 cert_path: Option<String>,
144 key_path: Option<String>,
145}
146
147impl Debug for Adapter {
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 let mut ds = f.debug_struct("Adapter");
150
151 ds.field("endpoints", &self.endpoints);
152 ds.finish()
153 }
154}
155
156impl Adapter {
157 async fn get_connection(&self) -> Result<RawClient> {
158 if let Some(client) = self.client.get() {
159 return Ok(client.clone());
160 }
161 let client = if self.insecure {
162 RawClient::new(self.endpoints.clone())
163 .await
164 .map_err(parse_tikv_config_error)?
165 } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() {
166 let (ca_path, key_path, cert_path) = (
167 self.ca_path.clone().unwrap(),
168 self.key_path.clone().unwrap(),
169 self.cert_path.clone().unwrap(),
170 );
171 let config = Config::default().with_security(ca_path, cert_path, key_path);
172 RawClient::new_with_config(self.endpoints.clone(), config)
173 .await
174 .map_err(parse_tikv_config_error)?
175 } else {
176 return Err(
177 Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
178 .with_context("service", Scheme::Tikv)
179 .with_context("endpoints", format!("{:?}", self.endpoints)),
180 );
181 };
182 self.client.set(client.clone()).ok();
183 Ok(client)
184 }
185}
186
187impl kv::Adapter for Adapter {
188 type Scanner = ();
189
190 fn info(&self) -> kv::Info {
191 kv::Info::new(
192 Scheme::Tikv,
193 "TiKV",
194 Capability {
195 read: true,
196 write: true,
197 blocking: false,
198 shared: true,
199 ..Default::default()
200 },
201 )
202 }
203
204 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
205 let result = self
206 .get_connection()
207 .await?
208 .get(path.to_owned())
209 .await
210 .map_err(parse_tikv_error)?;
211 Ok(result.map(Buffer::from))
212 }
213
214 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
215 self.get_connection()
216 .await?
217 .put(path.to_owned(), value.to_vec())
218 .await
219 .map_err(parse_tikv_error)
220 }
221
222 async fn delete(&self, path: &str) -> Result<()> {
223 self.get_connection()
224 .await?
225 .delete(path.to_owned())
226 .await
227 .map_err(parse_tikv_error)
228 }
229}
230
231fn parse_tikv_error(e: tikv_client::Error) -> Error {
232 Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
233}
234
235fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
236 Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
237 .with_context("service", Scheme::Tikv)
238 .set_source(e)
239}