opendal/services/etcd/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::vec;
21
22use bb8::PooledConnection;
23use bb8::RunError;
24use etcd_client::Certificate;
25use etcd_client::Client;
26use etcd_client::ConnectOptions;
27use etcd_client::Error as EtcdError;
28use etcd_client::GetOptions;
29use etcd_client::Identity;
30use etcd_client::TlsOptions;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
39
40impl Configurator for EtcdConfig {
41 type Builder = EtcdBuilder;
42 fn into_builder(self) -> Self::Builder {
43 EtcdBuilder { config: self }
44 }
45}
46
47#[doc = include_str!("docs.md")]
49#[derive(Clone, Default)]
50pub struct EtcdBuilder {
51 config: EtcdConfig,
52}
53
54impl Debug for EtcdBuilder {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 let mut ds = f.debug_struct("Builder");
57
58 ds.field("config", &self.config);
59 ds.finish()
60 }
61}
62
63impl EtcdBuilder {
64 pub fn endpoints(mut self, endpoints: &str) -> Self {
68 if !endpoints.is_empty() {
69 self.config.endpoints = Some(endpoints.to_owned());
70 }
71 self
72 }
73
74 pub fn username(mut self, username: &str) -> Self {
78 if !username.is_empty() {
79 self.config.username = Some(username.to_owned());
80 }
81 self
82 }
83
84 pub fn password(mut self, password: &str) -> Self {
88 if !password.is_empty() {
89 self.config.password = Some(password.to_owned());
90 }
91 self
92 }
93
94 pub fn root(mut self, root: &str) -> Self {
98 self.config.root = if root.is_empty() {
99 None
100 } else {
101 Some(root.to_string())
102 };
103
104 self
105 }
106
107 pub fn ca_path(mut self, ca_path: &str) -> Self {
111 if !ca_path.is_empty() {
112 self.config.ca_path = Some(ca_path.to_string())
113 }
114 self
115 }
116
117 pub fn cert_path(mut self, cert_path: &str) -> Self {
121 if !cert_path.is_empty() {
122 self.config.cert_path = Some(cert_path.to_string())
123 }
124 self
125 }
126
127 pub fn key_path(mut self, key_path: &str) -> Self {
131 if !key_path.is_empty() {
132 self.config.key_path = Some(key_path.to_string())
133 }
134 self
135 }
136}
137
138impl Builder for EtcdBuilder {
139 const SCHEME: Scheme = Scheme::Etcd;
140 type Config = EtcdConfig;
141
142 fn build(self) -> Result<impl Access> {
143 let endpoints = self
144 .config
145 .endpoints
146 .clone()
147 .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
148
149 let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
150
151 let mut options = ConnectOptions::new();
152
153 if self.config.ca_path.is_some()
154 && self.config.cert_path.is_some()
155 && self.config.key_path.is_some()
156 {
157 let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
158 let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
159 let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
160
161 let tls_options = TlsOptions::default()
162 .ca_certificate(Certificate::from_pem(ca))
163 .identity(Identity::from_pem(cert, key));
164 options = options.with_tls(tls_options);
165 }
166
167 if let Some(username) = self.config.username.clone() {
168 options = options.with_user(
169 username,
170 self.config.password.clone().unwrap_or("".to_string()),
171 );
172 }
173
174 let root = normalize_root(
175 self.config
176 .root
177 .clone()
178 .unwrap_or_else(|| "/".to_string())
179 .as_str(),
180 );
181
182 let client = OnceCell::new();
183 Ok(EtcdBackend::new(Adapter {
184 endpoints,
185 client,
186 options,
187 })
188 .with_normalized_root(root))
189 }
190}
191
192impl EtcdBuilder {
193 fn load_pem(&self, path: &str) -> Result<String> {
194 std::fs::read_to_string(path)
195 .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
196 }
197}
198
199pub type EtcdBackend = kv::Backend<Adapter>;
201
202#[derive(Clone)]
203pub struct Manager {
204 endpoints: Vec<String>,
205 options: ConnectOptions,
206}
207
208#[async_trait::async_trait]
209impl bb8::ManageConnection for Manager {
210 type Connection = Client;
211 type Error = Error;
212
213 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
214 let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone()))
215 .await
216 .map_err(format_etcd_error)?;
217
218 Ok(conn)
219 }
220
221 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
222 let _ = conn.status().await.map_err(format_etcd_error)?;
223 Ok(())
224 }
225
226 fn has_broken(&self, _: &mut Self::Connection) -> bool {
228 false
229 }
230}
231
232#[derive(Clone)]
233pub struct Adapter {
234 endpoints: Vec<String>,
235 options: ConnectOptions,
236 client: OnceCell<bb8::Pool<Manager>>,
237}
238
239impl Debug for Adapter {
241 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
242 let mut ds = f.debug_struct("Adapter");
243
244 ds.field("endpoints", &self.endpoints.join(","));
245 ds.field("options", &self.options.clone());
246 ds.finish()
247 }
248}
249
250impl Adapter {
251 async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
252 let client = self
253 .client
254 .get_or_try_init(|| async {
255 bb8::Pool::builder()
256 .max_size(64)
257 .build(Manager {
258 endpoints: self.endpoints.clone(),
259 options: self.options.clone(),
260 })
261 .await
262 })
263 .await?;
264
265 client.get_owned().await.map_err(|err| match err {
266 RunError::User(err) => err,
267 RunError::TimedOut => {
268 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
269 }
270 })
271 }
272}
273
274impl kv::Adapter for Adapter {
275 type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
276
277 fn info(&self) -> kv::Info {
278 kv::Info::new(
279 Scheme::Etcd,
280 &self.endpoints.join(","),
281 Capability {
282 read: true,
283 write: true,
284 list: true,
285 shared: true,
286
287 ..Default::default()
288 },
289 )
290 }
291
292 async fn get(&self, key: &str) -> Result<Option<Buffer>> {
293 let mut client = self.conn().await?;
294 let resp = client.get(key, None).await.map_err(format_etcd_error)?;
295 if let Some(kv) = resp.kvs().first() {
296 Ok(Some(Buffer::from(kv.value().to_vec())))
297 } else {
298 Ok(None)
299 }
300 }
301
302 async fn set(&self, key: &str, value: Buffer) -> Result<()> {
303 let mut client = self.conn().await?;
304 let _ = client
305 .put(key, value.to_vec(), None)
306 .await
307 .map_err(format_etcd_error)?;
308 Ok(())
309 }
310
311 async fn delete(&self, key: &str) -> Result<()> {
312 let mut client = self.conn().await?;
313 let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
314 Ok(())
315 }
316
317 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
318 let mut client = self.conn().await?;
319 let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
320 let resp = client
321 .get(path, get_options)
322 .await
323 .map_err(format_etcd_error)?;
324 let mut res = Vec::default();
325 for kv in resp.kvs() {
326 let v = kv.key_str().map(String::from).map_err(|err| {
327 Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
328 .set_source(err)
329 })?;
330 res.push(Ok(v));
331 }
332
333 Ok(kv::ScanStdIter::new(res.into_iter()))
334 }
335}
336
337pub fn format_etcd_error(e: EtcdError) -> Error {
338 Error::new(ErrorKind::Unexpected, e.to_string().as_str())
339 .set_source(e)
340 .set_temporary()
341}