opendal/services/etcd/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::vec;
21
22use bb8::PooledConnection;
23use bb8::RunError;
24use etcd_client::Certificate;
25use etcd_client::Client;
26use etcd_client::ConnectOptions;
27use etcd_client::Error as EtcdError;
28use etcd_client::GetOptions;
29use etcd_client::Identity;
30use etcd_client::TlsOptions;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
39
40impl Configurator for EtcdConfig {
41 type Builder = EtcdBuilder;
42 fn into_builder(self) -> Self::Builder {
43 EtcdBuilder { config: self }
44 }
45}
46
47#[doc = include_str!("docs.md")]
49#[derive(Clone, Default)]
50pub struct EtcdBuilder {
51 config: EtcdConfig,
52}
53
54impl Debug for EtcdBuilder {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 let mut ds = f.debug_struct("Builder");
57
58 ds.field("config", &self.config);
59 ds.finish()
60 }
61}
62
63impl EtcdBuilder {
64 pub fn endpoints(mut self, endpoints: &str) -> Self {
68 if !endpoints.is_empty() {
69 self.config.endpoints = Some(endpoints.to_owned());
70 }
71 self
72 }
73
74 pub fn username(mut self, username: &str) -> Self {
78 if !username.is_empty() {
79 self.config.username = Some(username.to_owned());
80 }
81 self
82 }
83
84 pub fn password(mut self, password: &str) -> Self {
88 if !password.is_empty() {
89 self.config.password = Some(password.to_owned());
90 }
91 self
92 }
93
94 pub fn root(mut self, root: &str) -> Self {
98 self.config.root = if root.is_empty() {
99 None
100 } else {
101 Some(root.to_string())
102 };
103
104 self
105 }
106
107 pub fn ca_path(mut self, ca_path: &str) -> Self {
111 if !ca_path.is_empty() {
112 self.config.ca_path = Some(ca_path.to_string())
113 }
114 self
115 }
116
117 pub fn cert_path(mut self, cert_path: &str) -> Self {
121 if !cert_path.is_empty() {
122 self.config.cert_path = Some(cert_path.to_string())
123 }
124 self
125 }
126
127 pub fn key_path(mut self, key_path: &str) -> Self {
131 if !key_path.is_empty() {
132 self.config.key_path = Some(key_path.to_string())
133 }
134 self
135 }
136}
137
138impl Builder for EtcdBuilder {
139 const SCHEME: Scheme = Scheme::Etcd;
140 type Config = EtcdConfig;
141
142 fn build(self) -> Result<impl Access> {
143 let endpoints = self
144 .config
145 .endpoints
146 .clone()
147 .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
148
149 let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
150
151 let mut options = ConnectOptions::new();
152
153 if self.config.ca_path.is_some()
154 && self.config.cert_path.is_some()
155 && self.config.key_path.is_some()
156 {
157 let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
158 let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
159 let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
160
161 let tls_options = TlsOptions::default()
162 .ca_certificate(Certificate::from_pem(ca))
163 .identity(Identity::from_pem(cert, key));
164 options = options.with_tls(tls_options);
165 }
166
167 if let Some(username) = self.config.username.clone() {
168 options = options.with_user(
169 username,
170 self.config.password.clone().unwrap_or("".to_string()),
171 );
172 }
173
174 let root = normalize_root(
175 self.config
176 .root
177 .clone()
178 .unwrap_or_else(|| "/".to_string())
179 .as_str(),
180 );
181
182 let client = OnceCell::new();
183 Ok(EtcdBackend::new(Adapter {
184 endpoints,
185 client,
186 options,
187 })
188 .with_normalized_root(root))
189 }
190}
191
192impl EtcdBuilder {
193 fn load_pem(&self, path: &str) -> Result<String> {
194 std::fs::read_to_string(path)
195 .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
196 }
197}
198
199pub type EtcdBackend = kv::Backend<Adapter>;
201
202#[derive(Clone)]
203pub struct Manager {
204 endpoints: Vec<String>,
205 options: ConnectOptions,
206}
207
208impl bb8::ManageConnection for Manager {
209 type Connection = Client;
210 type Error = Error;
211
212 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
213 let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone()))
214 .await
215 .map_err(format_etcd_error)?;
216
217 Ok(conn)
218 }
219
220 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
221 let _ = conn.status().await.map_err(format_etcd_error)?;
222 Ok(())
223 }
224
225 fn has_broken(&self, _: &mut Self::Connection) -> bool {
227 false
228 }
229}
230
231#[derive(Clone)]
232pub struct Adapter {
233 endpoints: Vec<String>,
234 options: ConnectOptions,
235 client: OnceCell<bb8::Pool<Manager>>,
236}
237
238impl Debug for Adapter {
240 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241 let mut ds = f.debug_struct("Adapter");
242
243 ds.field("endpoints", &self.endpoints.join(","));
244 ds.field("options", &self.options.clone());
245 ds.finish()
246 }
247}
248
249impl Adapter {
250 async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
251 let client = self
252 .client
253 .get_or_try_init(|| async {
254 bb8::Pool::builder()
255 .max_size(64)
256 .build(Manager {
257 endpoints: self.endpoints.clone(),
258 options: self.options.clone(),
259 })
260 .await
261 })
262 .await?;
263
264 client.get_owned().await.map_err(|err| match err {
265 RunError::User(err) => err,
266 RunError::TimedOut => {
267 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
268 }
269 })
270 }
271}
272
273impl kv::Adapter for Adapter {
274 type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
275
276 fn info(&self) -> kv::Info {
277 kv::Info::new(
278 Scheme::Etcd,
279 &self.endpoints.join(","),
280 Capability {
281 read: true,
282 write: true,
283 list: true,
284 shared: true,
285
286 ..Default::default()
287 },
288 )
289 }
290
291 async fn get(&self, key: &str) -> Result<Option<Buffer>> {
292 let mut client = self.conn().await?;
293 let resp = client.get(key, None).await.map_err(format_etcd_error)?;
294 if let Some(kv) = resp.kvs().first() {
295 Ok(Some(Buffer::from(kv.value().to_vec())))
296 } else {
297 Ok(None)
298 }
299 }
300
301 async fn set(&self, key: &str, value: Buffer) -> Result<()> {
302 let mut client = self.conn().await?;
303 let _ = client
304 .put(key, value.to_vec(), None)
305 .await
306 .map_err(format_etcd_error)?;
307 Ok(())
308 }
309
310 async fn delete(&self, key: &str) -> Result<()> {
311 let mut client = self.conn().await?;
312 let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
313 Ok(())
314 }
315
316 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
317 let mut client = self.conn().await?;
318 let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
319 let resp = client
320 .get(path, get_options)
321 .await
322 .map_err(format_etcd_error)?;
323 let mut res = Vec::default();
324 for kv in resp.kvs() {
325 let v = kv.key_str().map(String::from).map_err(|err| {
326 Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
327 .set_source(err)
328 })?;
329 res.push(Ok(v));
330 }
331
332 Ok(kv::ScanStdIter::new(res.into_iter()))
333 }
334}
335
336pub fn format_etcd_error(e: EtcdError) -> Error {
337 Error::new(ErrorKind::Unexpected, e.to_string().as_str())
338 .set_source(e)
339 .set_temporary()
340}