opendal/services/etcd/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use bb8::PooledConnection;
22use bb8::RunError;
23use etcd_client::Client;
24use etcd_client::ConnectOptions;
25use tokio::sync::OnceCell;
26
27use crate::services::etcd::error::format_etcd_error;
28use crate::{Buffer, Error, ErrorKind, Result};
29
30pub mod constants {
31 pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
32}
33
34#[derive(Clone)]
35pub struct Manager {
36 endpoints: Vec<String>,
37 options: ConnectOptions,
38}
39
40impl bb8::ManageConnection for Manager {
41 type Connection = Client;
42 type Error = Error;
43
44 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
45 let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone()))
46 .await
47 .map_err(format_etcd_error)?;
48
49 Ok(conn)
50 }
51
52 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
53 let _ = conn.status().await.map_err(format_etcd_error)?;
54 Ok(())
55 }
56
57 fn has_broken(&self, _: &mut Self::Connection) -> bool {
59 false
60 }
61}
62
63#[derive(Clone)]
64pub struct EtcdCore {
65 pub endpoints: Vec<String>,
66 pub options: ConnectOptions,
67 pub client: OnceCell<bb8::Pool<Manager>>,
68}
69
70impl Debug for EtcdCore {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("EtcdCore")
73 .field("endpoints", &self.endpoints.join(","))
74 .field("options", &self.options.clone())
75 .finish()
76 }
77}
78
79impl EtcdCore {
80 pub async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
81 let client = self
82 .client
83 .get_or_try_init(|| async {
84 bb8::Pool::builder()
85 .max_size(64)
86 .build(Manager {
87 endpoints: self.endpoints.clone(),
88 options: self.options.clone(),
89 })
90 .await
91 })
92 .await?;
93
94 client.get_owned().await.map_err(|err| match err {
95 RunError::User(err) => err,
96 RunError::TimedOut => {
97 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
98 }
99 })
100 }
101
102 pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
103 let mut client = self.conn().await?;
104 let get_options = Some(
105 etcd_client::GetOptions::new()
106 .with_prefix()
107 .with_keys_only()
108 .with_limit(1),
109 );
110 let resp = client
111 .get(prefix, get_options)
112 .await
113 .map_err(format_etcd_error)?;
114 Ok(!resp.kvs().is_empty())
115 }
116}
117
118impl EtcdCore {
119 pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
120 let mut client = self.conn().await?;
121 let resp = client.get(key, None).await.map_err(format_etcd_error)?;
122 if let Some(kv) = resp.kvs().first() {
123 Ok(Some(Buffer::from(kv.value().to_vec())))
124 } else {
125 Ok(None)
126 }
127 }
128
129 pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
130 let mut client = self.conn().await?;
131 let _ = client
132 .put(key, value.to_vec(), None)
133 .await
134 .map_err(format_etcd_error)?;
135 Ok(())
136 }
137
138 pub async fn delete(&self, key: &str) -> Result<()> {
139 let mut client = self.conn().await?;
140 let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
141 Ok(())
142 }
143}