opendal/services/etcd/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use bb8::PooledConnection;
22use bb8::RunError;
23use etcd_client::Client;
24use etcd_client::ConnectOptions;
25use tokio::sync::OnceCell;
26
27use crate::services::etcd::error::format_etcd_error;
28use crate::{Buffer, Error, ErrorKind, Result};
29
30pub mod constants {
31    pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
32}
33
34#[derive(Clone)]
35pub struct Manager {
36    endpoints: Vec<String>,
37    options: ConnectOptions,
38}
39
40impl bb8::ManageConnection for Manager {
41    type Connection = Client;
42    type Error = Error;
43
44    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
45        let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone()))
46            .await
47            .map_err(format_etcd_error)?;
48
49        Ok(conn)
50    }
51
52    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
53        let _ = conn.status().await.map_err(format_etcd_error)?;
54        Ok(())
55    }
56
57    /// Always allow reuse conn.
58    fn has_broken(&self, _: &mut Self::Connection) -> bool {
59        false
60    }
61}
62
63#[derive(Clone)]
64pub struct EtcdCore {
65    pub endpoints: Vec<String>,
66    pub options: ConnectOptions,
67    pub client: OnceCell<bb8::Pool<Manager>>,
68}
69
70impl Debug for EtcdCore {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("EtcdCore")
73            .field("endpoints", &self.endpoints.join(","))
74            .field("options", &self.options.clone())
75            .finish()
76    }
77}
78
79impl EtcdCore {
80    pub async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
81        let client = self
82            .client
83            .get_or_try_init(|| async {
84                bb8::Pool::builder()
85                    .max_size(64)
86                    .build(Manager {
87                        endpoints: self.endpoints.clone(),
88                        options: self.options.clone(),
89                    })
90                    .await
91            })
92            .await?;
93
94        client.get_owned().await.map_err(|err| match err {
95            RunError::User(err) => err,
96            RunError::TimedOut => {
97                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
98            }
99        })
100    }
101
102    pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
103        let mut client = self.conn().await?;
104        let get_options = Some(
105            etcd_client::GetOptions::new()
106                .with_prefix()
107                .with_keys_only()
108                .with_limit(1),
109        );
110        let resp = client
111            .get(prefix, get_options)
112            .await
113            .map_err(format_etcd_error)?;
114        Ok(!resp.kvs().is_empty())
115    }
116}
117
118impl EtcdCore {
119    pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
120        let mut client = self.conn().await?;
121        let resp = client.get(key, None).await.map_err(format_etcd_error)?;
122        if let Some(kv) = resp.kvs().first() {
123            Ok(Some(Buffer::from(kv.value().to_vec())))
124        } else {
125            Ok(None)
126        }
127    }
128
129    pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
130        let mut client = self.conn().await?;
131        let _ = client
132            .put(key, value.to_vec(), None)
133            .await
134            .map_err(format_etcd_error)?;
135        Ok(())
136    }
137
138    pub async fn delete(&self, key: &str) -> Result<()> {
139        let mut client = self.conn().await?;
140        let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
141        Ok(())
142    }
143}