opendal_core/services/tikv/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use mea::once::OnceCell;
21use tikv_client::Config;
22use tikv_client::RawClient;
23
24use super::TIKV_SCHEME;
25use crate::*;
26
27/// TikvCore holds the configuration and client for interacting with TiKV.
28#[derive(Clone)]
29pub struct TikvCore {
30    pub client: OnceCell<RawClient>,
31    pub endpoints: Vec<String>,
32    pub insecure: bool,
33    pub ca_path: Option<String>,
34    pub cert_path: Option<String>,
35    pub key_path: Option<String>,
36}
37
38impl Debug for TikvCore {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("TikvCore")
41            .field("endpoints", &self.endpoints)
42            .field("insecure", &self.insecure)
43            .finish()
44    }
45}
46
47impl TikvCore {
48    async fn get_connection(&self) -> Result<&RawClient> {
49        self.client
50            .get_or_try_init(|| async {
51                if self.insecure {
52                    return RawClient::new(self.endpoints.clone())
53                        .await
54                        .map_err(parse_tikv_config_error);
55                }
56
57                if let Some(ca_path) = self.ca_path.as_ref()
58                    && let Some(key_path) = self.key_path.as_ref()
59                    && let Some(cert_path) = self.cert_path.as_ref()
60                {
61                    let config = Config::default().with_security(ca_path, cert_path, key_path);
62                    return RawClient::new_with_config(self.endpoints.clone(), config)
63                        .await
64                        .map_err(parse_tikv_config_error);
65                }
66
67                Err(
68                    Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
69                        .with_context("service", TIKV_SCHEME)
70                        .with_context("endpoints", format!("{:?}", self.endpoints)),
71                )
72            })
73            .await
74    }
75
76    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
77        let result = self
78            .get_connection()
79            .await?
80            .get(path.to_owned())
81            .await
82            .map_err(parse_tikv_error)?;
83        Ok(result.map(Buffer::from))
84    }
85
86    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
87        self.get_connection()
88            .await?
89            .put(path.to_owned(), value.to_vec())
90            .await
91            .map_err(parse_tikv_error)
92    }
93
94    pub async fn delete(&self, path: &str) -> Result<()> {
95        self.get_connection()
96            .await?
97            .delete(path.to_owned())
98            .await
99            .map_err(parse_tikv_error)
100    }
101}
102
103fn parse_tikv_error(e: tikv_client::Error) -> Error {
104    Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
105}
106
107fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
108    Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
109        .with_context("service", TIKV_SCHEME)
110        .set_source(e)
111}