opendal/services/etcd/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::vec;
21
22use bb8::PooledConnection;
23use bb8::RunError;
24use etcd_client::Certificate;
25use etcd_client::Client;
26use etcd_client::ConnectOptions;
27use etcd_client::Error as EtcdError;
28use etcd_client::GetOptions;
29use etcd_client::Identity;
30use etcd_client::TlsOptions;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
39
40impl Configurator for EtcdConfig {
41    type Builder = EtcdBuilder;
42    fn into_builder(self) -> Self::Builder {
43        EtcdBuilder { config: self }
44    }
45}
46
47/// [Etcd](https://etcd.io/) services support.
48#[doc = include_str!("docs.md")]
49#[derive(Clone, Default)]
50pub struct EtcdBuilder {
51    config: EtcdConfig,
52}
53
54impl Debug for EtcdBuilder {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        let mut ds = f.debug_struct("Builder");
57
58        ds.field("config", &self.config);
59        ds.finish()
60    }
61}
62
63impl EtcdBuilder {
64    /// set the network address of etcd service.
65    ///
66    /// default: "http://127.0.0.1:2379"
67    pub fn endpoints(mut self, endpoints: &str) -> Self {
68        if !endpoints.is_empty() {
69            self.config.endpoints = Some(endpoints.to_owned());
70        }
71        self
72    }
73
74    /// set the username for etcd
75    ///
76    /// default: no username
77    pub fn username(mut self, username: &str) -> Self {
78        if !username.is_empty() {
79            self.config.username = Some(username.to_owned());
80        }
81        self
82    }
83
84    /// set the password for etcd
85    ///
86    /// default: no password
87    pub fn password(mut self, password: &str) -> Self {
88        if !password.is_empty() {
89            self.config.password = Some(password.to_owned());
90        }
91        self
92    }
93
94    /// set the working directory, all operations will be performed under it.
95    ///
96    /// default: "/"
97    pub fn root(mut self, root: &str) -> Self {
98        self.config.root = if root.is_empty() {
99            None
100        } else {
101            Some(root.to_string())
102        };
103
104        self
105    }
106
107    /// Set the certificate authority file path.
108    ///
109    /// default is None
110    pub fn ca_path(mut self, ca_path: &str) -> Self {
111        if !ca_path.is_empty() {
112            self.config.ca_path = Some(ca_path.to_string())
113        }
114        self
115    }
116
117    /// Set the certificate file path.
118    ///
119    /// default is None
120    pub fn cert_path(mut self, cert_path: &str) -> Self {
121        if !cert_path.is_empty() {
122            self.config.cert_path = Some(cert_path.to_string())
123        }
124        self
125    }
126
127    /// Set the key file path.
128    ///
129    /// default is None
130    pub fn key_path(mut self, key_path: &str) -> Self {
131        if !key_path.is_empty() {
132            self.config.key_path = Some(key_path.to_string())
133        }
134        self
135    }
136}
137
138impl Builder for EtcdBuilder {
139    const SCHEME: Scheme = Scheme::Etcd;
140    type Config = EtcdConfig;
141
142    fn build(self) -> Result<impl Access> {
143        let endpoints = self
144            .config
145            .endpoints
146            .clone()
147            .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
148
149        let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
150
151        let mut options = ConnectOptions::new();
152
153        if self.config.ca_path.is_some()
154            && self.config.cert_path.is_some()
155            && self.config.key_path.is_some()
156        {
157            let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
158            let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
159            let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
160
161            let tls_options = TlsOptions::default()
162                .ca_certificate(Certificate::from_pem(ca))
163                .identity(Identity::from_pem(cert, key));
164            options = options.with_tls(tls_options);
165        }
166
167        if let Some(username) = self.config.username.clone() {
168            options = options.with_user(
169                username,
170                self.config.password.clone().unwrap_or("".to_string()),
171            );
172        }
173
174        let root = normalize_root(
175            self.config
176                .root
177                .clone()
178                .unwrap_or_else(|| "/".to_string())
179                .as_str(),
180        );
181
182        let client = OnceCell::new();
183        Ok(EtcdBackend::new(Adapter {
184            endpoints,
185            client,
186            options,
187        })
188        .with_normalized_root(root))
189    }
190}
191
192impl EtcdBuilder {
193    fn load_pem(&self, path: &str) -> Result<String> {
194        std::fs::read_to_string(path)
195            .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
196    }
197}
198
199/// Backend for etcd services.
200pub type EtcdBackend = kv::Backend<Adapter>;
201
202#[derive(Clone)]
203pub struct Manager {
204    endpoints: Vec<String>,
205    options: ConnectOptions,
206}
207
208#[async_trait::async_trait]
209impl bb8::ManageConnection for Manager {
210    type Connection = Client;
211    type Error = Error;
212
213    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
214        let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone()))
215            .await
216            .map_err(format_etcd_error)?;
217
218        Ok(conn)
219    }
220
221    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
222        let _ = conn.status().await.map_err(format_etcd_error)?;
223        Ok(())
224    }
225
226    /// Always allow reuse conn.
227    fn has_broken(&self, _: &mut Self::Connection) -> bool {
228        false
229    }
230}
231
232#[derive(Clone)]
233pub struct Adapter {
234    endpoints: Vec<String>,
235    options: ConnectOptions,
236    client: OnceCell<bb8::Pool<Manager>>,
237}
238
239// implement `Debug` manually, or password may be leaked.
240impl Debug for Adapter {
241    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
242        let mut ds = f.debug_struct("Adapter");
243
244        ds.field("endpoints", &self.endpoints.join(","));
245        ds.field("options", &self.options.clone());
246        ds.finish()
247    }
248}
249
250impl Adapter {
251    async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
252        let client = self
253            .client
254            .get_or_try_init(|| async {
255                bb8::Pool::builder()
256                    .max_size(64)
257                    .build(Manager {
258                        endpoints: self.endpoints.clone(),
259                        options: self.options.clone(),
260                    })
261                    .await
262            })
263            .await?;
264
265        client.get_owned().await.map_err(|err| match err {
266            RunError::User(err) => err,
267            RunError::TimedOut => {
268                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
269            }
270        })
271    }
272}
273
274impl kv::Adapter for Adapter {
275    type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
276
277    fn info(&self) -> kv::Info {
278        kv::Info::new(
279            Scheme::Etcd,
280            &self.endpoints.join(","),
281            Capability {
282                read: true,
283                write: true,
284                list: true,
285                shared: true,
286
287                ..Default::default()
288            },
289        )
290    }
291
292    async fn get(&self, key: &str) -> Result<Option<Buffer>> {
293        let mut client = self.conn().await?;
294        let resp = client.get(key, None).await.map_err(format_etcd_error)?;
295        if let Some(kv) = resp.kvs().first() {
296            Ok(Some(Buffer::from(kv.value().to_vec())))
297        } else {
298            Ok(None)
299        }
300    }
301
302    async fn set(&self, key: &str, value: Buffer) -> Result<()> {
303        let mut client = self.conn().await?;
304        let _ = client
305            .put(key, value.to_vec(), None)
306            .await
307            .map_err(format_etcd_error)?;
308        Ok(())
309    }
310
311    async fn delete(&self, key: &str) -> Result<()> {
312        let mut client = self.conn().await?;
313        let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
314        Ok(())
315    }
316
317    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
318        let mut client = self.conn().await?;
319        let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
320        let resp = client
321            .get(path, get_options)
322            .await
323            .map_err(format_etcd_error)?;
324        let mut res = Vec::default();
325        for kv in resp.kvs() {
326            let v = kv.key_str().map(String::from).map_err(|err| {
327                Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
328                    .set_source(err)
329            })?;
330            res.push(Ok(v));
331        }
332
333        Ok(kv::ScanStdIter::new(res.into_iter()))
334    }
335}
336
337pub fn format_etcd_error(e: EtcdError) -> Error {
338    Error::new(ErrorKind::Unexpected, e.to_string().as_str())
339        .set_source(e)
340        .set_temporary()
341}