opendal/services/tikv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use tikv_client::Config;
22use tikv_client::RawClient;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::Access;
27use crate::services::TikvConfig;
28use crate::Builder;
29use crate::Capability;
30use crate::Error;
31use crate::ErrorKind;
32use crate::Scheme;
33use crate::*;
34
35impl Configurator for TikvConfig {
36    type Builder = TikvBuilder;
37    fn into_builder(self) -> Self::Builder {
38        TikvBuilder { config: self }
39    }
40}
41
42/// TiKV backend builder
43#[doc = include_str!("docs.md")]
44#[derive(Clone, Default)]
45pub struct TikvBuilder {
46    config: TikvConfig,
47}
48
49impl Debug for TikvBuilder {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        let mut d = f.debug_struct("TikvBuilder");
52
53        d.field("config", &self.config);
54        d.finish_non_exhaustive()
55    }
56}
57
58impl TikvBuilder {
59    /// Set the network address of the TiKV service.
60    pub fn endpoints(mut self, endpoints: Vec<String>) -> Self {
61        if !endpoints.is_empty() {
62            self.config.endpoints = Some(endpoints)
63        }
64        self
65    }
66
67    /// Set the insecure connection to TiKV.
68    pub fn insecure(mut self) -> Self {
69        self.config.insecure = true;
70        self
71    }
72
73    /// Set the certificate authority file path.
74    pub fn ca_path(mut self, ca_path: &str) -> Self {
75        if !ca_path.is_empty() {
76            self.config.ca_path = Some(ca_path.to_string())
77        }
78        self
79    }
80
81    /// Set the certificate file path.
82    pub fn cert_path(mut self, cert_path: &str) -> Self {
83        if !cert_path.is_empty() {
84            self.config.cert_path = Some(cert_path.to_string())
85        }
86        self
87    }
88
89    /// Set the key file path.
90    pub fn key_path(mut self, key_path: &str) -> Self {
91        if !key_path.is_empty() {
92            self.config.key_path = Some(key_path.to_string())
93        }
94        self
95    }
96}
97
98impl Builder for TikvBuilder {
99    type Config = TikvConfig;
100
101    fn build(self) -> Result<impl Access> {
102        let endpoints = self.config.endpoints.ok_or_else(|| {
103            Error::new(
104                ErrorKind::ConfigInvalid,
105                "endpoints is required but not set",
106            )
107            .with_context("service", Scheme::Tikv)
108        })?;
109
110        if self.config.insecure
111            && (self.config.ca_path.is_some()
112                || self.config.key_path.is_some()
113                || self.config.cert_path.is_some())
114        {
115            return Err(
116                Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration")
117                    .with_context("service", Scheme::Tikv)
118                    .with_context("endpoints", format!("{endpoints:?}")),
119            )?;
120        }
121
122        Ok(TikvBackend::new(Adapter {
123            client: OnceCell::new(),
124            endpoints,
125            insecure: self.config.insecure,
126            ca_path: self.config.ca_path.clone(),
127            cert_path: self.config.cert_path.clone(),
128            key_path: self.config.key_path.clone(),
129        }))
130    }
131}
132
133/// Backend for TiKV service
134pub type TikvBackend = kv::Backend<Adapter>;
135
136#[derive(Clone)]
137pub struct Adapter {
138    client: OnceCell<RawClient>,
139    endpoints: Vec<String>,
140    insecure: bool,
141    ca_path: Option<String>,
142    cert_path: Option<String>,
143    key_path: Option<String>,
144}
145
146impl Debug for Adapter {
147    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148        let mut ds = f.debug_struct("Adapter");
149
150        ds.field("endpoints", &self.endpoints);
151        ds.finish()
152    }
153}
154
155impl Adapter {
156    async fn get_connection(&self) -> Result<RawClient> {
157        if let Some(client) = self.client.get() {
158            return Ok(client.clone());
159        }
160        let client = if self.insecure {
161            RawClient::new(self.endpoints.clone())
162                .await
163                .map_err(parse_tikv_config_error)?
164        } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() {
165            let (ca_path, key_path, cert_path) = (
166                self.ca_path.clone().unwrap(),
167                self.key_path.clone().unwrap(),
168                self.cert_path.clone().unwrap(),
169            );
170            let config = Config::default().with_security(ca_path, cert_path, key_path);
171            RawClient::new_with_config(self.endpoints.clone(), config)
172                .await
173                .map_err(parse_tikv_config_error)?
174        } else {
175            return Err(
176                Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
177                    .with_context("service", Scheme::Tikv)
178                    .with_context("endpoints", format!("{:?}", self.endpoints)),
179            );
180        };
181        self.client.set(client.clone()).ok();
182        Ok(client)
183    }
184}
185
186impl kv::Adapter for Adapter {
187    type Scanner = ();
188
189    fn info(&self) -> kv::Info {
190        kv::Info::new(
191            Scheme::Tikv,
192            "TiKV",
193            Capability {
194                read: true,
195                write: true,
196                shared: true,
197                ..Default::default()
198            },
199        )
200    }
201
202    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
203        let result = self
204            .get_connection()
205            .await?
206            .get(path.to_owned())
207            .await
208            .map_err(parse_tikv_error)?;
209        Ok(result.map(Buffer::from))
210    }
211
212    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
213        self.get_connection()
214            .await?
215            .put(path.to_owned(), value.to_vec())
216            .await
217            .map_err(parse_tikv_error)
218    }
219
220    async fn delete(&self, path: &str) -> Result<()> {
221        self.get_connection()
222            .await?
223            .delete(path.to_owned())
224            .await
225            .map_err(parse_tikv_error)
226    }
227}
228
229fn parse_tikv_error(e: tikv_client::Error) -> Error {
230    Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
231}
232
233fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
234    Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
235        .with_context("service", Scheme::Tikv)
236        .set_source(e)
237}