opendal_core/services/etcd/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use etcd_client::Client;
19use etcd_client::ConnectOptions;
20use fastpool::ManageObject;
21use fastpool::ObjectStatus;
22use fastpool::bounded;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use crate::raw::*;
27use crate::services::etcd::error::format_etcd_error;
28use crate::{Buffer, Error, ErrorKind, Result};
29
30pub mod constants {
31    pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
32}
33
34#[derive(Clone)]
35pub struct Manager {
36    endpoints: Vec<String>,
37    options: ConnectOptions,
38}
39
40impl ManageObject for Manager {
41    type Object = Client;
42    type Error = Error;
43
44    async fn create(&self) -> Result<Self::Object, Self::Error> {
45        Client::connect(self.endpoints.clone(), Some(self.options.clone()))
46            .await
47            .map_err(format_etcd_error)
48    }
49
50    async fn is_recyclable(
51        &self,
52        o: &mut Self::Object,
53        _: &ObjectStatus,
54    ) -> Result<(), Self::Error> {
55        match o.status().await {
56            Ok(_) => Ok(()),
57            Err(err) => Err(format_etcd_error(err)),
58        }
59    }
60}
61
62#[derive(Clone)]
63pub struct EtcdCore {
64    endpoints: Vec<String>,
65    options: ConnectOptions,
66    client: Arc<bounded::Pool<Manager>>,
67}
68
69impl Debug for EtcdCore {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("EtcdCore")
72            .field("endpoints", &self.endpoints.join(","))
73            .field("options", &self.options.clone())
74            .finish_non_exhaustive()
75    }
76}
77
78impl EtcdCore {
79    pub fn new(endpoints: Vec<String>, options: ConnectOptions) -> Self {
80        let client = bounded::Pool::new(
81            bounded::PoolConfig::new(64),
82            Manager {
83                endpoints: endpoints.clone(),
84                options: options.clone(),
85            },
86        );
87
88        Self {
89            endpoints,
90            options,
91            client,
92        }
93    }
94
95    pub async fn conn(&self) -> Result<bounded::Object<Manager>> {
96        let fut = self.client.get();
97
98        tokio::select! {
99            _ = tokio::time::sleep(Duration::from_secs(10)) => {
100                Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
101            }
102            result = fut => match result {
103                Ok(conn) => Ok(conn),
104                Err(err) => Err(err),
105            }
106        }
107    }
108
109    pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
110        let mut client = self.conn().await?;
111        let get_options = Some(
112            etcd_client::GetOptions::new()
113                .with_prefix()
114                .with_keys_only()
115                .with_limit(1),
116        );
117        let resp = client
118            .get(prefix, get_options)
119            .await
120            .map_err(format_etcd_error)?;
121        Ok(!resp.kvs().is_empty())
122    }
123}
124
125impl EtcdCore {
126    pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
127        let mut client = self.conn().await?;
128        let resp = client.get(key, None).await.map_err(format_etcd_error)?;
129        if let Some(kv) = resp.kvs().first() {
130            Ok(Some(Buffer::from(kv.value().to_vec())))
131        } else {
132            Ok(None)
133        }
134    }
135
136    pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
137        let mut client = self.conn().await?;
138        let _ = client
139            .put(key, value.to_vec(), None)
140            .await
141            .map_err(format_etcd_error)?;
142        Ok(())
143    }
144
145    pub async fn delete(&self, key: &str) -> Result<()> {
146        let mut client = self.conn().await?;
147        let _ = client.delete(key, None).await.map_err(format_etcd_error)?;
148        Ok(())
149    }
150}