opendal/services/etcd/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use etcd_client::Certificate;
21use etcd_client::ConnectOptions;
22use etcd_client::Identity;
23use etcd_client::TlsOptions;
24
25use super::ETCD_SCHEME;
26use super::config::EtcdConfig;
27use super::core::EtcdCore;
28use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
29use super::deleter::EtcdDeleter;
30use super::lister::EtcdLister;
31use super::writer::EtcdWriter;
32use crate::raw::*;
33use crate::*;
34
35/// [Etcd](https://etcd.io/) services support.
36#[doc = include_str!("docs.md")]
37#[derive(Debug, Default)]
38pub struct EtcdBuilder {
39    pub(super) config: EtcdConfig,
40}
41
42impl EtcdBuilder {
43    /// set the network address of etcd service.
44    ///
45    /// default: "http://127.0.0.1:2379"
46    pub fn endpoints(mut self, endpoints: &str) -> Self {
47        if !endpoints.is_empty() {
48            self.config.endpoints = Some(endpoints.to_owned());
49        }
50        self
51    }
52
53    /// set the username for etcd
54    ///
55    /// default: no username
56    pub fn username(mut self, username: &str) -> Self {
57        if !username.is_empty() {
58            self.config.username = Some(username.to_owned());
59        }
60        self
61    }
62
63    /// set the password for etcd
64    ///
65    /// default: no password
66    pub fn password(mut self, password: &str) -> Self {
67        if !password.is_empty() {
68            self.config.password = Some(password.to_owned());
69        }
70        self
71    }
72
73    /// set the working directory, all operations will be performed under it.
74    ///
75    /// default: "/"
76    pub fn root(mut self, root: &str) -> Self {
77        self.config.root = if root.is_empty() {
78            None
79        } else {
80            Some(root.to_string())
81        };
82
83        self
84    }
85
86    /// Set the certificate authority file path.
87    ///
88    /// default is None
89    pub fn ca_path(mut self, ca_path: &str) -> Self {
90        if !ca_path.is_empty() {
91            self.config.ca_path = Some(ca_path.to_string())
92        }
93        self
94    }
95
96    /// Set the certificate file path.
97    ///
98    /// default is None
99    pub fn cert_path(mut self, cert_path: &str) -> Self {
100        if !cert_path.is_empty() {
101            self.config.cert_path = Some(cert_path.to_string())
102        }
103        self
104    }
105
106    /// Set the key file path.
107    ///
108    /// default is None
109    pub fn key_path(mut self, key_path: &str) -> Self {
110        if !key_path.is_empty() {
111            self.config.key_path = Some(key_path.to_string())
112        }
113        self
114    }
115}
116
117impl Builder for EtcdBuilder {
118    type Config = EtcdConfig;
119
120    fn build(self) -> Result<impl Access> {
121        let endpoints = self
122            .config
123            .endpoints
124            .clone()
125            .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
126
127        let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
128
129        let mut options = ConnectOptions::new();
130
131        if self.config.ca_path.is_some()
132            && self.config.cert_path.is_some()
133            && self.config.key_path.is_some()
134        {
135            let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
136            let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
137            let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
138
139            let tls_options = TlsOptions::default()
140                .ca_certificate(Certificate::from_pem(ca))
141                .identity(Identity::from_pem(cert, key));
142            options = options.with_tls(tls_options);
143        }
144
145        if let Some(username) = self.config.username.clone() {
146            options = options.with_user(
147                username,
148                self.config.password.clone().unwrap_or("".to_string()),
149            );
150        }
151
152        let root = normalize_root(
153            self.config
154                .root
155                .clone()
156                .unwrap_or_else(|| "/".to_string())
157                .as_str(),
158        );
159
160        let core = EtcdCore::new(endpoints, options);
161        Ok(EtcdBackend::new(core, &root))
162    }
163}
164
165impl EtcdBuilder {
166    fn load_pem(&self, path: &str) -> Result<String> {
167        std::fs::read_to_string(path)
168            .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
169    }
170}
171
172#[derive(Debug, Clone)]
173pub struct EtcdBackend {
174    core: Arc<EtcdCore>,
175    info: Arc<AccessorInfo>,
176}
177
178impl EtcdBackend {
179    fn new(core: EtcdCore, root: &str) -> Self {
180        let info = AccessorInfo::default();
181        info.set_scheme(ETCD_SCHEME);
182        info.set_name("etcd");
183        info.set_root(root);
184        info.set_native_capability(Capability {
185            read: true,
186
187            write: true,
188            write_can_empty: true,
189
190            delete: true,
191            stat: true,
192            list: true,
193
194            shared: true,
195
196            ..Default::default()
197        });
198
199        Self {
200            core: Arc::new(core),
201            info: Arc::new(info),
202        }
203    }
204}
205
206impl Access for EtcdBackend {
207    type Reader = Buffer;
208    type Writer = EtcdWriter;
209    type Lister = oio::HierarchyLister<EtcdLister>;
210    type Deleter = oio::OneShotDeleter<EtcdDeleter>;
211
212    fn info(&self) -> Arc<AccessorInfo> {
213        self.info.clone()
214    }
215
216    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
217        let abs_path = build_abs_path(&self.info.root(), path);
218
219        // In etcd, we simulate directory creation by storing an empty value
220        // with the directory path (ensuring it ends with '/')
221        let dir_path = if abs_path.ends_with('/') {
222            abs_path
223        } else {
224            format!("{abs_path}/")
225        };
226
227        // Store an empty buffer to represent the directory
228        self.core.set(&dir_path, Buffer::new()).await?;
229
230        Ok(RpCreateDir::default())
231    }
232
233    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
234        let abs_path = build_abs_path(&self.info.root(), path);
235
236        // First check if it's a direct key
237        match self.core.get(&abs_path).await? {
238            Some(buffer) => {
239                let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
240                metadata.set_content_length(buffer.len() as u64);
241                Ok(RpStat::new(metadata))
242            }
243            None => {
244                // Check if it's a directory by looking for keys with this prefix
245                let prefix = if abs_path.ends_with('/') {
246                    abs_path
247                } else {
248                    format!("{abs_path}/")
249                };
250
251                // Use etcd prefix query to check if any keys exist with this prefix
252                let has_children = self.core.has_prefix(&prefix).await?;
253                if has_children {
254                    // Has children, it's a directory
255                    let metadata = Metadata::new(EntryMode::DIR);
256                    Ok(RpStat::new(metadata))
257                } else {
258                    Err(Error::new(ErrorKind::NotFound, "path not found"))
259                }
260            }
261        }
262    }
263
264    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
265        let abs_path = build_abs_path(&self.info.root(), path);
266
267        match self.core.get(&abs_path).await? {
268            Some(buffer) => {
269                let range = op.range();
270
271                // If range is full, return the buffer directly
272                if range.is_full() {
273                    return Ok((RpRead::new(), buffer));
274                }
275
276                // Handle range requests
277                let offset = range.offset() as usize;
278                if offset >= buffer.len() {
279                    return Err(Error::new(
280                        ErrorKind::RangeNotSatisfied,
281                        "range start offset exceeds content length",
282                    ));
283                }
284
285                let size = range.size().map(|s| s as usize);
286                let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
287                let sliced_buffer = buffer.slice(offset..end);
288
289                Ok((RpRead::new(), sliced_buffer))
290            }
291            None => Err(Error::new(ErrorKind::NotFound, "path not found")),
292        }
293    }
294
295    async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
296        let abs_path = build_abs_path(&self.info.root(), path);
297        let writer = EtcdWriter::new(self.core.clone(), abs_path);
298        Ok((RpWrite::new(), writer))
299    }
300
301    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
302        let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
303            self.core.clone(),
304            self.info.root().to_string(),
305        ));
306        Ok((RpDelete::default(), deleter))
307    }
308
309    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
310        let lister = EtcdLister::new(
311            self.core.clone(),
312            self.info.root().to_string(),
313            path.to_string(),
314        )
315        .await?;
316        let lister = oio::HierarchyLister::new(lister, path, args.recursive());
317        Ok((RpList::default(), lister))
318    }
319}