opendal/services/tikv/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use tokio::sync::OnceCell;
22
23use super::TIKV_SCHEME;
24use super::config::TikvConfig;
25use super::core::*;
26use super::deleter::TikvDeleter;
27use super::writer::TikvWriter;
28use crate::raw::*;
29use crate::*;
30
31#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct TikvBuilder {
35 pub(super) config: TikvConfig,
36}
37
38impl TikvBuilder {
39 pub fn endpoints(mut self, endpoints: Vec<String>) -> Self {
41 if !endpoints.is_empty() {
42 self.config.endpoints = Some(endpoints)
43 }
44 self
45 }
46
47 pub fn insecure(mut self) -> Self {
49 self.config.insecure = true;
50 self
51 }
52
53 pub fn ca_path(mut self, ca_path: &str) -> Self {
55 if !ca_path.is_empty() {
56 self.config.ca_path = Some(ca_path.to_string())
57 }
58 self
59 }
60
61 pub fn cert_path(mut self, cert_path: &str) -> Self {
63 if !cert_path.is_empty() {
64 self.config.cert_path = Some(cert_path.to_string())
65 }
66 self
67 }
68
69 pub fn key_path(mut self, key_path: &str) -> Self {
71 if !key_path.is_empty() {
72 self.config.key_path = Some(key_path.to_string())
73 }
74 self
75 }
76}
77
78impl Builder for TikvBuilder {
79 type Config = TikvConfig;
80
81 fn build(self) -> Result<impl Access> {
82 let endpoints = self.config.endpoints.ok_or_else(|| {
83 Error::new(
84 ErrorKind::ConfigInvalid,
85 "endpoints is required but not set",
86 )
87 .with_context("service", TIKV_SCHEME)
88 })?;
89
90 if self.config.insecure
91 && (self.config.ca_path.is_some()
92 || self.config.key_path.is_some()
93 || self.config.cert_path.is_some())
94 {
95 return Err(
96 Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration")
97 .with_context("service", TIKV_SCHEME)
98 .with_context("endpoints", format!("{endpoints:?}")),
99 )?;
100 }
101
102 Ok(TikvBackend::new(TikvCore {
103 client: OnceCell::new(),
104 endpoints,
105 insecure: self.config.insecure,
106 ca_path: self.config.ca_path.clone(),
107 cert_path: self.config.cert_path.clone(),
108 key_path: self.config.key_path.clone(),
109 }))
110 }
111}
112
113#[derive(Clone, Debug)]
115pub struct TikvBackend {
116 core: Arc<TikvCore>,
117 root: String,
118 info: Arc<AccessorInfo>,
119}
120
121impl TikvBackend {
122 fn new(core: TikvCore) -> Self {
123 let info = AccessorInfo::default();
124 info.set_scheme(TIKV_SCHEME);
125 info.set_name("TiKV");
126 info.set_root("/");
127 info.set_native_capability(Capability {
128 read: true,
129 stat: true,
130 write: true,
131 write_can_empty: true,
132 delete: true,
133 shared: true,
134 ..Default::default()
135 });
136
137 Self {
138 core: Arc::new(core),
139 root: "/".to_string(),
140 info: Arc::new(info),
141 }
142 }
143}
144
145impl Access for TikvBackend {
146 type Reader = Buffer;
147 type Writer = TikvWriter;
148 type Lister = ();
149 type Deleter = oio::OneShotDeleter<TikvDeleter>;
150
151 fn info(&self) -> Arc<AccessorInfo> {
152 self.info.clone()
153 }
154
155 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
156 let p = build_abs_path(&self.root, path);
157
158 if p == build_abs_path(&self.root, "") {
159 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
160 } else {
161 let bs = self.core.get(&p).await?;
162 match bs {
163 Some(bs) => Ok(RpStat::new(
164 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
165 )),
166 None => Err(Error::new(ErrorKind::NotFound, "kv not found in tikv")),
167 }
168 }
169 }
170
171 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
172 let p = build_abs_path(&self.root, path);
173 let bs = match self.core.get(&p).await? {
174 Some(bs) => bs,
175 None => return Err(Error::new(ErrorKind::NotFound, "kv not found in tikv")),
176 };
177 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
178 }
179
180 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
181 let p = build_abs_path(&self.root, path);
182 Ok((RpWrite::new(), TikvWriter::new(self.core.clone(), p)))
183 }
184
185 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
186 Ok((
187 RpDelete::default(),
188 oio::OneShotDeleter::new(TikvDeleter::new(self.core.clone(), self.root.clone())),
189 ))
190 }
191}