opendal/services/etcd/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::vec;
21
22use bb8::PooledConnection;
23use bb8::RunError;
24use etcd_client::Certificate;
25use etcd_client::Client;
26use etcd_client::ConnectOptions;
27use etcd_client::Error as EtcdError;
28use etcd_client::GetOptions;
29use etcd_client::Identity;
30use etcd_client::TlsOptions;
31use tokio::sync::OnceCell;
32
33use crate::raw::adapters::kv;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
39
40impl Configurator for EtcdConfig {
41    type Builder = EtcdBuilder;
42    fn into_builder(self) -> Self::Builder {
43        EtcdBuilder { config: self }
44    }
45}
46
47/// [Etcd](https://etcd.io/) services support.
48#[doc = include_str!("docs.md")]
49#[derive(Clone, Default)]
50pub struct EtcdBuilder {
51    config: EtcdConfig,
52}
53
54impl Debug for EtcdBuilder {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        let mut ds = f.debug_struct("Builder");
57
58        ds.field("config", &self.config);
59        ds.finish()
60    }
61}
62
63impl EtcdBuilder {
64    /// set the network address of etcd service.
65    ///
66    /// default: "http://127.0.0.1:2379"
67    pub fn endpoints(mut self, endpoints: &str) -> Self {
68        if !endpoints.is_empty() {
69            self.config.endpoints = Some(endpoints.to_owned());
70        }
71        self
72    }
73
74    /// set the username for etcd
75    ///
76    /// default: no username
77    pub fn username(mut self, username: &str) -> Self {
78        if !username.is_empty() {
79            self.config.username = Some(username.to_owned());
80        }
81        self
82    }
83
84    /// set the password for etcd
85    ///
86    /// default: no password
87    pub fn password(mut self, password: &str) -> Self {
88        if !password.is_empty() {
89            self.config.password = Some(password.to_owned());
90        }
91        self
92    }
93
94    /// set the working directory, all operations will be performed under it.
95    ///
96    /// default: "/"
97    pub fn root(mut self, root: &str) -> Self {
98        self.config.root = if root.is_empty() {
99            None
100        } else {
101            Some(root.to_string())
102        };
103
104        self
105    }
106
107    /// Set the certificate authority file path.
108    ///
109    /// default is None
110    pub fn ca_path(mut self, ca_path: &str) -> Self {
111        if !ca_path.is_empty() {
112            self.config.ca_path = Some(ca_path.to_string())
113        }
114        self
115    }
116
117    /// Set the certificate file path.
118    ///
119    /// default is None
120    pub fn cert_path(mut self, cert_path: &str) -> Self {
121        if !cert_path.is_empty() {
122            self.config.cert_path = Some(cert_path.to_string())
123        }
124        self
125    }
126
127    /// Set the key file path.
128    ///
129    /// default is None
130    pub fn key_path(mut self, key_path: &str) -> Self {
131        if !key_path.is_empty() {
132            self.config.key_path = Some(key_path.to_string())
133        }
134        self
135    }
136}
137
138impl Builder for EtcdBuilder {
139    const SCHEME: Scheme = Scheme::Etcd;
140    type Config = EtcdConfig;
141
142    fn build(self) -> Result<impl Access> {
143        let endpoints = self
144            .config
145            .endpoints
146            .clone()
147            .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
148
149        let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
150
151        let mut options = ConnectOptions::new();
152
153        if self.config.ca_path.is_some()
154            && self.config.cert_path.is_some()
155            && self.config.key_path.is_some()
156        {
157            let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
158            let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
159            let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
160
161            let tls_options = TlsOptions::default()
162                .ca_certificate(Certificate::from_pem(ca))
163                .identity(Identity::from_pem(cert, key));
164            options = options.with_tls(tls_options);
165        }
166
167        if let Some(username) = self.config.username.clone() {
168            options = options.with_user(
169                username,
170                self.config.password.clone().unwrap_or("".to_string()),
171            );
172        }
173
174        let root = normalize_root(
175            self.config
176                .root
177                .clone()
178                .unwrap_or_else(|| "/".to_string())
179                .as_str(),
180        );
181
182        let client = OnceCell::new();
183        Ok(EtcdBackend::new(Adapter {
184            endpoints,
185            client,
186            options,
187        })
188        .with_normalized_root(root))
189    }
190}
191
192impl EtcdBuilder {
193    fn load_pem(&self, path: &str) -> Result<String> {
194        std::fs::read_to_string(path)
195            .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
196    }
197}
198
199/// Backend for etcd services.
200pub type EtcdBackend = kv::Backend<Adapter>;
201
202#[derive(Clone)]
203pub struct Manager {
204    endpoints: Vec<String>,
205    options: ConnectOptions,
206}
207
208impl bb8::ManageConnection for Manager {
209    type Connection = Client;
210    type Error = Error;
211
212    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
213        let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone()))
214            .await
215            .map_err(format_etcd_error)?;
216
217        Ok(conn)
218    }
219
220    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
221        let _ = conn.status().await.map_err(format_etcd_error)?;
222        Ok(())
223    }
224
225    /// Always allow reuse conn.
226    fn has_broken(&self, _: &mut Self::Connection) -> bool {
227        false
228    }
229}
230
231#[derive(Clone)]
232pub struct Adapter {
233    endpoints: Vec<String>,
234    options: ConnectOptions,
235    client: OnceCell<bb8::Pool<Manager>>,
236}
237
238// implement `Debug` manually, or password may be leaked.
239impl Debug for Adapter {
240    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241        let mut ds = f.debug_struct("Adapter");
242
243        ds.field("endpoints", &self.endpoints.join(","));
244        ds.field("options", &self.options.clone());
245        ds.finish()
246    }
247}
248
249impl Adapter {
250    async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
251        let client = self
252            .client
253            .get_or_try_init(|| async {
254                bb8::Pool::builder()
255                    .max_size(64)
256                    .build(Manager {
257                        endpoints: self.endpoints.clone(),
258                        options: self.options.clone(),
259                    })
260                    .await
261            })
262            .await?;
263
264        client.get_owned().await.map_err(|err| match err {
265            RunError::User(err) => err,
266            RunError::TimedOut => {
267                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
268            }
269        })
270    }
271}
272
273impl kv::Adapter for Adapter {
274    type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
275
276    fn info(&self) -> kv::Info {
277        kv::Info::new(
278            Scheme::Etcd,
279            &self.endpoints.join(","),
280            Capability {
281                read: true,
282                write: true,
283                list: true,
284                shared: true,
285
286                ..Default::default()
287            },
288        )
289    }
290
291    async fn get(&self, key: &str) -> Result<Option<Buffer>> {
292        let mut client = self.conn().await?;
293        let resp = client.get(key, None).await.map_err(format_etcd_error)?;
294        if let Some(kv) = resp.kvs().first() {
295            Ok(Some(Buffer::from(kv.value().to_vec())))
296        } else {
297            Ok(None)
298        }
299    }
300
301    async fn set(&self, key: &str, value: Buffer) -> Result<()> {
302        let mut client = self.conn().await?;
303        let _ = client
304            .put(key, value.to_vec(), None)
305            .await
306            .map_err(format_etcd_error)?;
307        Ok(())
308    }
309
310    async fn delete(&self, key: &str) -> Result<()> {
311        let mut client = self.conn().await?;
312        let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
313        Ok(())
314    }
315
316    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
317        let mut client = self.conn().await?;
318        let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
319        let resp = client
320            .get(path, get_options)
321            .await
322            .map_err(format_etcd_error)?;
323        let mut res = Vec::default();
324        for kv in resp.kvs() {
325            let v = kv.key_str().map(String::from).map_err(|err| {
326                Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
327                    .set_source(err)
328            })?;
329            res.push(Ok(v));
330        }
331
332        Ok(kv::ScanStdIter::new(res.into_iter()))
333    }
334}
335
336pub fn format_etcd_error(e: EtcdError) -> Error {
337    Error::new(ErrorKind::Unexpected, e.to_string().as_str())
338        .set_source(e)
339        .set_temporary()
340}