opendal/services/etcd/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use etcd_client::Certificate;
23use etcd_client::ConnectOptions;
24use etcd_client::Identity;
25use etcd_client::TlsOptions;
26use tokio::sync::OnceCell;
27
28use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
29use super::core::EtcdCore;
30use super::deleter::EtcdDeleter;
31use super::lister::EtcdLister;
32use super::writer::EtcdWriter;
33use super::DEFAULT_SCHEME;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38impl Configurator for EtcdConfig {
39    type Builder = EtcdBuilder;
40    fn into_builder(self) -> Self::Builder {
41        EtcdBuilder { config: self }
42    }
43}
44
45/// [Etcd](https://etcd.io/) services support.
46#[doc = include_str!("docs.md")]
47#[derive(Clone, Default)]
48pub struct EtcdBuilder {
49    config: EtcdConfig,
50}
51
52impl Debug for EtcdBuilder {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        let mut ds = f.debug_struct("Builder");
55
56        ds.field("config", &self.config);
57        ds.finish()
58    }
59}
60
61impl EtcdBuilder {
62    /// set the network address of etcd service.
63    ///
64    /// default: "http://127.0.0.1:2379"
65    pub fn endpoints(mut self, endpoints: &str) -> Self {
66        if !endpoints.is_empty() {
67            self.config.endpoints = Some(endpoints.to_owned());
68        }
69        self
70    }
71
72    /// set the username for etcd
73    ///
74    /// default: no username
75    pub fn username(mut self, username: &str) -> Self {
76        if !username.is_empty() {
77            self.config.username = Some(username.to_owned());
78        }
79        self
80    }
81
82    /// set the password for etcd
83    ///
84    /// default: no password
85    pub fn password(mut self, password: &str) -> Self {
86        if !password.is_empty() {
87            self.config.password = Some(password.to_owned());
88        }
89        self
90    }
91
92    /// set the working directory, all operations will be performed under it.
93    ///
94    /// default: "/"
95    pub fn root(mut self, root: &str) -> Self {
96        self.config.root = if root.is_empty() {
97            None
98        } else {
99            Some(root.to_string())
100        };
101
102        self
103    }
104
105    /// Set the certificate authority file path.
106    ///
107    /// default is None
108    pub fn ca_path(mut self, ca_path: &str) -> Self {
109        if !ca_path.is_empty() {
110            self.config.ca_path = Some(ca_path.to_string())
111        }
112        self
113    }
114
115    /// Set the certificate file path.
116    ///
117    /// default is None
118    pub fn cert_path(mut self, cert_path: &str) -> Self {
119        if !cert_path.is_empty() {
120            self.config.cert_path = Some(cert_path.to_string())
121        }
122        self
123    }
124
125    /// Set the key file path.
126    ///
127    /// default is None
128    pub fn key_path(mut self, key_path: &str) -> Self {
129        if !key_path.is_empty() {
130            self.config.key_path = Some(key_path.to_string())
131        }
132        self
133    }
134}
135
136impl Builder for EtcdBuilder {
137    type Config = EtcdConfig;
138
139    fn build(self) -> Result<impl Access> {
140        let endpoints = self
141            .config
142            .endpoints
143            .clone()
144            .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
145
146        let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
147
148        let mut options = ConnectOptions::new();
149
150        if self.config.ca_path.is_some()
151            && self.config.cert_path.is_some()
152            && self.config.key_path.is_some()
153        {
154            let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
155            let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
156            let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
157
158            let tls_options = TlsOptions::default()
159                .ca_certificate(Certificate::from_pem(ca))
160                .identity(Identity::from_pem(cert, key));
161            options = options.with_tls(tls_options);
162        }
163
164        if let Some(username) = self.config.username.clone() {
165            options = options.with_user(
166                username,
167                self.config.password.clone().unwrap_or("".to_string()),
168            );
169        }
170
171        let root = normalize_root(
172            self.config
173                .root
174                .clone()
175                .unwrap_or_else(|| "/".to_string())
176                .as_str(),
177        );
178
179        let client = OnceCell::new();
180
181        let core = EtcdCore {
182            endpoints,
183            client,
184            options,
185        };
186
187        Ok(EtcdAccessor::new(core, &root))
188    }
189}
190
191impl EtcdBuilder {
192    fn load_pem(&self, path: &str) -> Result<String> {
193        std::fs::read_to_string(path)
194            .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
195    }
196}
197
198#[derive(Debug, Clone)]
199pub struct EtcdAccessor {
200    core: Arc<EtcdCore>,
201    info: Arc<AccessorInfo>,
202}
203
204impl EtcdAccessor {
205    fn new(core: EtcdCore, root: &str) -> Self {
206        let info = AccessorInfo::default();
207        info.set_scheme(DEFAULT_SCHEME);
208        info.set_name("etcd");
209        info.set_root(root);
210        info.set_native_capability(Capability {
211            read: true,
212
213            write: true,
214            write_can_empty: true,
215
216            delete: true,
217            stat: true,
218            list: true,
219
220            shared: true,
221
222            ..Default::default()
223        });
224
225        Self {
226            core: Arc::new(core),
227            info: Arc::new(info),
228        }
229    }
230}
231
232impl Access for EtcdAccessor {
233    type Reader = Buffer;
234    type Writer = EtcdWriter;
235    type Lister = oio::HierarchyLister<EtcdLister>;
236    type Deleter = oio::OneShotDeleter<EtcdDeleter>;
237
238    fn info(&self) -> Arc<AccessorInfo> {
239        self.info.clone()
240    }
241
242    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
243        let abs_path = build_abs_path(&self.info.root(), path);
244
245        // In etcd, we simulate directory creation by storing an empty value
246        // with the directory path (ensuring it ends with '/')
247        let dir_path = if abs_path.ends_with('/') {
248            abs_path
249        } else {
250            format!("{abs_path}/")
251        };
252
253        // Store an empty buffer to represent the directory
254        self.core.set(&dir_path, Buffer::new()).await?;
255
256        Ok(RpCreateDir::default())
257    }
258
259    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
260        let abs_path = build_abs_path(&self.info.root(), path);
261
262        // First check if it's a direct key
263        match self.core.get(&abs_path).await? {
264            Some(buffer) => {
265                let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
266                metadata.set_content_length(buffer.len() as u64);
267                Ok(RpStat::new(metadata))
268            }
269            None => {
270                // Check if it's a directory by looking for keys with this prefix
271                let prefix = if abs_path.ends_with('/') {
272                    abs_path
273                } else {
274                    format!("{abs_path}/")
275                };
276
277                // Use etcd prefix query to check if any keys exist with this prefix
278                let has_children = self.core.has_prefix(&prefix).await?;
279                if has_children {
280                    // Has children, it's a directory
281                    let metadata = Metadata::new(EntryMode::DIR);
282                    Ok(RpStat::new(metadata))
283                } else {
284                    Err(Error::new(ErrorKind::NotFound, "path not found"))
285                }
286            }
287        }
288    }
289
290    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
291        let abs_path = build_abs_path(&self.info.root(), path);
292
293        match self.core.get(&abs_path).await? {
294            Some(buffer) => {
295                let range = op.range();
296
297                // If range is full, return the buffer directly
298                if range.is_full() {
299                    return Ok((RpRead::new(), buffer));
300                }
301
302                // Handle range requests
303                let offset = range.offset() as usize;
304                if offset >= buffer.len() {
305                    return Err(Error::new(
306                        ErrorKind::RangeNotSatisfied,
307                        "range start offset exceeds content length",
308                    ));
309                }
310
311                let size = range.size().map(|s| s as usize);
312                let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
313                let sliced_buffer = buffer.slice(offset..end);
314
315                Ok((RpRead::new(), sliced_buffer))
316            }
317            None => Err(Error::new(ErrorKind::NotFound, "path not found")),
318        }
319    }
320
321    async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
322        let abs_path = build_abs_path(&self.info.root(), path);
323        let writer = EtcdWriter::new(self.core.clone(), abs_path);
324        Ok((RpWrite::new(), writer))
325    }
326
327    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
328        let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
329            self.core.clone(),
330            self.info.root().to_string(),
331        ));
332        Ok((RpDelete::default(), deleter))
333    }
334
335    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
336        let lister = EtcdLister::new(
337            self.core.clone(),
338            self.info.root().to_string(),
339            path.to_string(),
340        )
341        .await?;
342        let lister = oio::HierarchyLister::new(lister, path, args.recursive());
343        Ok((RpList::default(), lister))
344    }
345}