opendal_core/services/etcd/
core.rs1use etcd_client::Client;
19use etcd_client::ConnectOptions;
20use fastpool::ManageObject;
21use fastpool::ObjectStatus;
22use fastpool::bounded;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use crate::raw::*;
27use crate::services::etcd::error::format_etcd_error;
28use crate::{Buffer, Error, ErrorKind, Result};
29
30pub mod constants {
31 pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
32}
33
34#[derive(Clone)]
35pub struct Manager {
36 endpoints: Vec<String>,
37 options: ConnectOptions,
38}
39
40impl ManageObject for Manager {
41 type Object = Client;
42 type Error = Error;
43
44 async fn create(&self) -> Result<Self::Object, Self::Error> {
45 Client::connect(self.endpoints.clone(), Some(self.options.clone()))
46 .await
47 .map_err(format_etcd_error)
48 }
49
50 async fn is_recyclable(
51 &self,
52 o: &mut Self::Object,
53 _: &ObjectStatus,
54 ) -> Result<(), Self::Error> {
55 match o.status().await {
56 Ok(_) => Ok(()),
57 Err(err) => Err(format_etcd_error(err)),
58 }
59 }
60}
61
62#[derive(Clone)]
63pub struct EtcdCore {
64 endpoints: Vec<String>,
65 options: ConnectOptions,
66 client: Arc<bounded::Pool<Manager>>,
67}
68
69impl Debug for EtcdCore {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("EtcdCore")
72 .field("endpoints", &self.endpoints.join(","))
73 .field("options", &self.options.clone())
74 .finish_non_exhaustive()
75 }
76}
77
78impl EtcdCore {
79 pub fn new(endpoints: Vec<String>, options: ConnectOptions) -> Self {
80 let client = bounded::Pool::new(
81 bounded::PoolConfig::new(64),
82 Manager {
83 endpoints: endpoints.clone(),
84 options: options.clone(),
85 },
86 );
87
88 Self {
89 endpoints,
90 options,
91 client,
92 }
93 }
94
95 pub async fn conn(&self) -> Result<bounded::Object<Manager>> {
96 let fut = self.client.get();
97
98 tokio::select! {
99 _ = tokio::time::sleep(Duration::from_secs(10)) => {
100 Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
101 }
102 result = fut => match result {
103 Ok(conn) => Ok(conn),
104 Err(err) => Err(err),
105 }
106 }
107 }
108
109 pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
110 let mut client = self.conn().await?;
111 let get_options = Some(
112 etcd_client::GetOptions::new()
113 .with_prefix()
114 .with_keys_only()
115 .with_limit(1),
116 );
117 let resp = client
118 .get(prefix, get_options)
119 .await
120 .map_err(format_etcd_error)?;
121 Ok(!resp.kvs().is_empty())
122 }
123}
124
125impl EtcdCore {
126 pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
127 let mut client = self.conn().await?;
128 let resp = client.get(key, None).await.map_err(format_etcd_error)?;
129 if let Some(kv) = resp.kvs().first() {
130 Ok(Some(Buffer::from(kv.value().to_vec())))
131 } else {
132 Ok(None)
133 }
134 }
135
136 pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
137 let mut client = self.conn().await?;
138 let _ = client
139 .put(key, value.to_vec(), None)
140 .await
141 .map_err(format_etcd_error)?;
142 Ok(())
143 }
144
145 pub async fn delete(&self, key: &str) -> Result<()> {
146 let mut client = self.conn().await?;
147 let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
148 Ok(())
149 }
150}