opendal/services/tikv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use tikv_client::Config;
22use tikv_client::RawClient;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::Access;
27use crate::services::TikvConfig;
28use crate::Builder;
29use crate::Capability;
30use crate::Error;
31use crate::ErrorKind;
32use crate::Scheme;
33use crate::*;
34
35impl Configurator for TikvConfig {
36    type Builder = TikvBuilder;
37    fn into_builder(self) -> Self::Builder {
38        TikvBuilder { config: self }
39    }
40}
41
42/// TiKV backend builder
43#[doc = include_str!("docs.md")]
44#[derive(Clone, Default)]
45pub struct TikvBuilder {
46    config: TikvConfig,
47}
48
49impl Debug for TikvBuilder {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        let mut d = f.debug_struct("TikvBuilder");
52
53        d.field("config", &self.config);
54        d.finish_non_exhaustive()
55    }
56}
57
58impl TikvBuilder {
59    /// Set the network address of the TiKV service.
60    pub fn endpoints(mut self, endpoints: Vec<String>) -> Self {
61        if !endpoints.is_empty() {
62            self.config.endpoints = Some(endpoints)
63        }
64        self
65    }
66
67    /// Set the insecure connection to TiKV.
68    pub fn insecure(mut self) -> Self {
69        self.config.insecure = true;
70        self
71    }
72
73    /// Set the certificate authority file path.
74    pub fn ca_path(mut self, ca_path: &str) -> Self {
75        if !ca_path.is_empty() {
76            self.config.ca_path = Some(ca_path.to_string())
77        }
78        self
79    }
80
81    /// Set the certificate file path.
82    pub fn cert_path(mut self, cert_path: &str) -> Self {
83        if !cert_path.is_empty() {
84            self.config.cert_path = Some(cert_path.to_string())
85        }
86        self
87    }
88
89    /// Set the key file path.
90    pub fn key_path(mut self, key_path: &str) -> Self {
91        if !key_path.is_empty() {
92            self.config.key_path = Some(key_path.to_string())
93        }
94        self
95    }
96}
97
98impl Builder for TikvBuilder {
99    const SCHEME: Scheme = Scheme::Tikv;
100    type Config = TikvConfig;
101
102    fn build(self) -> Result<impl Access> {
103        let endpoints = self.config.endpoints.ok_or_else(|| {
104            Error::new(
105                ErrorKind::ConfigInvalid,
106                "endpoints is required but not set",
107            )
108            .with_context("service", Scheme::Tikv)
109        })?;
110
111        if self.config.insecure
112            && (self.config.ca_path.is_some()
113                || self.config.key_path.is_some()
114                || self.config.cert_path.is_some())
115        {
116            return Err(
117                Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration")
118                    .with_context("service", Scheme::Tikv)
119                    .with_context("endpoints", format!("{:?}", endpoints)),
120            )?;
121        }
122
123        Ok(TikvBackend::new(Adapter {
124            client: OnceCell::new(),
125            endpoints,
126            insecure: self.config.insecure,
127            ca_path: self.config.ca_path.clone(),
128            cert_path: self.config.cert_path.clone(),
129            key_path: self.config.key_path.clone(),
130        }))
131    }
132}
133
134/// Backend for TiKV service
135pub type TikvBackend = kv::Backend<Adapter>;
136
137#[derive(Clone)]
138pub struct Adapter {
139    client: OnceCell<RawClient>,
140    endpoints: Vec<String>,
141    insecure: bool,
142    ca_path: Option<String>,
143    cert_path: Option<String>,
144    key_path: Option<String>,
145}
146
147impl Debug for Adapter {
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        let mut ds = f.debug_struct("Adapter");
150
151        ds.field("endpoints", &self.endpoints);
152        ds.finish()
153    }
154}
155
156impl Adapter {
157    async fn get_connection(&self) -> Result<RawClient> {
158        if let Some(client) = self.client.get() {
159            return Ok(client.clone());
160        }
161        let client = if self.insecure {
162            RawClient::new(self.endpoints.clone())
163                .await
164                .map_err(parse_tikv_config_error)?
165        } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() {
166            let (ca_path, key_path, cert_path) = (
167                self.ca_path.clone().unwrap(),
168                self.key_path.clone().unwrap(),
169                self.cert_path.clone().unwrap(),
170            );
171            let config = Config::default().with_security(ca_path, cert_path, key_path);
172            RawClient::new_with_config(self.endpoints.clone(), config)
173                .await
174                .map_err(parse_tikv_config_error)?
175        } else {
176            return Err(
177                Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
178                    .with_context("service", Scheme::Tikv)
179                    .with_context("endpoints", format!("{:?}", self.endpoints)),
180            );
181        };
182        self.client.set(client.clone()).ok();
183        Ok(client)
184    }
185}
186
187impl kv::Adapter for Adapter {
188    type Scanner = ();
189
190    fn info(&self) -> kv::Info {
191        kv::Info::new(
192            Scheme::Tikv,
193            "TiKV",
194            Capability {
195                read: true,
196                write: true,
197                blocking: false,
198                shared: true,
199                ..Default::default()
200            },
201        )
202    }
203
204    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
205        let result = self
206            .get_connection()
207            .await?
208            .get(path.to_owned())
209            .await
210            .map_err(parse_tikv_error)?;
211        Ok(result.map(Buffer::from))
212    }
213
214    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
215        self.get_connection()
216            .await?
217            .put(path.to_owned(), value.to_vec())
218            .await
219            .map_err(parse_tikv_error)
220    }
221
222    async fn delete(&self, path: &str) -> Result<()> {
223        self.get_connection()
224            .await?
225            .delete(path.to_owned())
226            .await
227            .map_err(parse_tikv_error)
228    }
229}
230
231fn parse_tikv_error(e: tikv_client::Error) -> Error {
232    Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
233}
234
235fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
236    Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
237        .with_context("service", Scheme::Tikv)
238        .set_source(e)
239}