opendal/services/tikv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use tokio::sync::OnceCell;
22
23use super::TIKV_SCHEME;
24use super::config::TikvConfig;
25use super::core::*;
26use super::deleter::TikvDeleter;
27use super::writer::TikvWriter;
28use crate::raw::*;
29use crate::*;
30
31/// TiKV backend builder
32#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct TikvBuilder {
35    pub(super) config: TikvConfig,
36}
37
38impl TikvBuilder {
39    /// Set the network address of the TiKV service.
40    pub fn endpoints(mut self, endpoints: Vec<String>) -> Self {
41        if !endpoints.is_empty() {
42            self.config.endpoints = Some(endpoints)
43        }
44        self
45    }
46
47    /// Set the insecure connection to TiKV.
48    pub fn insecure(mut self) -> Self {
49        self.config.insecure = true;
50        self
51    }
52
53    /// Set the certificate authority file path.
54    pub fn ca_path(mut self, ca_path: &str) -> Self {
55        if !ca_path.is_empty() {
56            self.config.ca_path = Some(ca_path.to_string())
57        }
58        self
59    }
60
61    /// Set the certificate file path.
62    pub fn cert_path(mut self, cert_path: &str) -> Self {
63        if !cert_path.is_empty() {
64            self.config.cert_path = Some(cert_path.to_string())
65        }
66        self
67    }
68
69    /// Set the key file path.
70    pub fn key_path(mut self, key_path: &str) -> Self {
71        if !key_path.is_empty() {
72            self.config.key_path = Some(key_path.to_string())
73        }
74        self
75    }
76}
77
78impl Builder for TikvBuilder {
79    type Config = TikvConfig;
80
81    fn build(self) -> Result<impl Access> {
82        let endpoints = self.config.endpoints.ok_or_else(|| {
83            Error::new(
84                ErrorKind::ConfigInvalid,
85                "endpoints is required but not set",
86            )
87            .with_context("service", TIKV_SCHEME)
88        })?;
89
90        if self.config.insecure
91            && (self.config.ca_path.is_some()
92                || self.config.key_path.is_some()
93                || self.config.cert_path.is_some())
94        {
95            return Err(
96                Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration")
97                    .with_context("service", TIKV_SCHEME)
98                    .with_context("endpoints", format!("{endpoints:?}")),
99            )?;
100        }
101
102        Ok(TikvBackend::new(TikvCore {
103            client: OnceCell::new(),
104            endpoints,
105            insecure: self.config.insecure,
106            ca_path: self.config.ca_path.clone(),
107            cert_path: self.config.cert_path.clone(),
108            key_path: self.config.key_path.clone(),
109        }))
110    }
111}
112
113/// Backend for TiKV service
114#[derive(Clone, Debug)]
115pub struct TikvBackend {
116    core: Arc<TikvCore>,
117    root: String,
118    info: Arc<AccessorInfo>,
119}
120
121impl TikvBackend {
122    fn new(core: TikvCore) -> Self {
123        let info = AccessorInfo::default();
124        info.set_scheme(TIKV_SCHEME);
125        info.set_name("TiKV");
126        info.set_root("/");
127        info.set_native_capability(Capability {
128            read: true,
129            stat: true,
130            write: true,
131            write_can_empty: true,
132            delete: true,
133            shared: true,
134            ..Default::default()
135        });
136
137        Self {
138            core: Arc::new(core),
139            root: "/".to_string(),
140            info: Arc::new(info),
141        }
142    }
143}
144
145impl Access for TikvBackend {
146    type Reader = Buffer;
147    type Writer = TikvWriter;
148    type Lister = ();
149    type Deleter = oio::OneShotDeleter<TikvDeleter>;
150
151    fn info(&self) -> Arc<AccessorInfo> {
152        self.info.clone()
153    }
154
155    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
156        let p = build_abs_path(&self.root, path);
157
158        if p == build_abs_path(&self.root, "") {
159            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
160        } else {
161            let bs = self.core.get(&p).await?;
162            match bs {
163                Some(bs) => Ok(RpStat::new(
164                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
165                )),
166                None => Err(Error::new(ErrorKind::NotFound, "kv not found in tikv")),
167            }
168        }
169    }
170
171    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
172        let p = build_abs_path(&self.root, path);
173        let bs = match self.core.get(&p).await? {
174            Some(bs) => bs,
175            None => return Err(Error::new(ErrorKind::NotFound, "kv not found in tikv")),
176        };
177        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
178    }
179
180    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
181        let p = build_abs_path(&self.root, path);
182        Ok((RpWrite::new(), TikvWriter::new(self.core.clone(), p)))
183    }
184
185    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
186        Ok((
187            RpDelete::default(),
188            oio::OneShotDeleter::new(TikvDeleter::new(self.core.clone(), self.root.clone())),
189        ))
190    }
191}