opendal/services/etcd/
backend.rs1use std::sync::Arc;
19
20use etcd_client::Certificate;
21use etcd_client::ConnectOptions;
22use etcd_client::Identity;
23use etcd_client::TlsOptions;
24
25use super::ETCD_SCHEME;
26use super::config::EtcdConfig;
27use super::core::EtcdCore;
28use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
29use super::deleter::EtcdDeleter;
30use super::lister::EtcdLister;
31use super::writer::EtcdWriter;
32use crate::raw::*;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
37#[derive(Debug, Default)]
38pub struct EtcdBuilder {
39 pub(super) config: EtcdConfig,
40}
41
42impl EtcdBuilder {
43 pub fn endpoints(mut self, endpoints: &str) -> Self {
47 if !endpoints.is_empty() {
48 self.config.endpoints = Some(endpoints.to_owned());
49 }
50 self
51 }
52
53 pub fn username(mut self, username: &str) -> Self {
57 if !username.is_empty() {
58 self.config.username = Some(username.to_owned());
59 }
60 self
61 }
62
63 pub fn password(mut self, password: &str) -> Self {
67 if !password.is_empty() {
68 self.config.password = Some(password.to_owned());
69 }
70 self
71 }
72
73 pub fn root(mut self, root: &str) -> Self {
77 self.config.root = if root.is_empty() {
78 None
79 } else {
80 Some(root.to_string())
81 };
82
83 self
84 }
85
86 pub fn ca_path(mut self, ca_path: &str) -> Self {
90 if !ca_path.is_empty() {
91 self.config.ca_path = Some(ca_path.to_string())
92 }
93 self
94 }
95
96 pub fn cert_path(mut self, cert_path: &str) -> Self {
100 if !cert_path.is_empty() {
101 self.config.cert_path = Some(cert_path.to_string())
102 }
103 self
104 }
105
106 pub fn key_path(mut self, key_path: &str) -> Self {
110 if !key_path.is_empty() {
111 self.config.key_path = Some(key_path.to_string())
112 }
113 self
114 }
115}
116
117impl Builder for EtcdBuilder {
118 type Config = EtcdConfig;
119
120 fn build(self) -> Result<impl Access> {
121 let endpoints = self
122 .config
123 .endpoints
124 .clone()
125 .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
126
127 let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
128
129 let mut options = ConnectOptions::new();
130
131 if self.config.ca_path.is_some()
132 && self.config.cert_path.is_some()
133 && self.config.key_path.is_some()
134 {
135 let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
136 let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
137 let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
138
139 let tls_options = TlsOptions::default()
140 .ca_certificate(Certificate::from_pem(ca))
141 .identity(Identity::from_pem(cert, key));
142 options = options.with_tls(tls_options);
143 }
144
145 if let Some(username) = self.config.username.clone() {
146 options = options.with_user(
147 username,
148 self.config.password.clone().unwrap_or("".to_string()),
149 );
150 }
151
152 let root = normalize_root(
153 self.config
154 .root
155 .clone()
156 .unwrap_or_else(|| "/".to_string())
157 .as_str(),
158 );
159
160 let core = EtcdCore::new(endpoints, options);
161 Ok(EtcdBackend::new(core, &root))
162 }
163}
164
165impl EtcdBuilder {
166 fn load_pem(&self, path: &str) -> Result<String> {
167 std::fs::read_to_string(path)
168 .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
169 }
170}
171
172#[derive(Debug, Clone)]
173pub struct EtcdBackend {
174 core: Arc<EtcdCore>,
175 info: Arc<AccessorInfo>,
176}
177
178impl EtcdBackend {
179 fn new(core: EtcdCore, root: &str) -> Self {
180 let info = AccessorInfo::default();
181 info.set_scheme(ETCD_SCHEME);
182 info.set_name("etcd");
183 info.set_root(root);
184 info.set_native_capability(Capability {
185 read: true,
186
187 write: true,
188 write_can_empty: true,
189
190 delete: true,
191 stat: true,
192 list: true,
193
194 shared: true,
195
196 ..Default::default()
197 });
198
199 Self {
200 core: Arc::new(core),
201 info: Arc::new(info),
202 }
203 }
204}
205
206impl Access for EtcdBackend {
207 type Reader = Buffer;
208 type Writer = EtcdWriter;
209 type Lister = oio::HierarchyLister<EtcdLister>;
210 type Deleter = oio::OneShotDeleter<EtcdDeleter>;
211
212 fn info(&self) -> Arc<AccessorInfo> {
213 self.info.clone()
214 }
215
216 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
217 let abs_path = build_abs_path(&self.info.root(), path);
218
219 let dir_path = if abs_path.ends_with('/') {
222 abs_path
223 } else {
224 format!("{abs_path}/")
225 };
226
227 self.core.set(&dir_path, Buffer::new()).await?;
229
230 Ok(RpCreateDir::default())
231 }
232
233 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
234 let abs_path = build_abs_path(&self.info.root(), path);
235
236 match self.core.get(&abs_path).await? {
238 Some(buffer) => {
239 let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
240 metadata.set_content_length(buffer.len() as u64);
241 Ok(RpStat::new(metadata))
242 }
243 None => {
244 let prefix = if abs_path.ends_with('/') {
246 abs_path
247 } else {
248 format!("{abs_path}/")
249 };
250
251 let has_children = self.core.has_prefix(&prefix).await?;
253 if has_children {
254 let metadata = Metadata::new(EntryMode::DIR);
256 Ok(RpStat::new(metadata))
257 } else {
258 Err(Error::new(ErrorKind::NotFound, "path not found"))
259 }
260 }
261 }
262 }
263
264 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
265 let abs_path = build_abs_path(&self.info.root(), path);
266
267 match self.core.get(&abs_path).await? {
268 Some(buffer) => {
269 let range = op.range();
270
271 if range.is_full() {
273 return Ok((RpRead::new(), buffer));
274 }
275
276 let offset = range.offset() as usize;
278 if offset >= buffer.len() {
279 return Err(Error::new(
280 ErrorKind::RangeNotSatisfied,
281 "range start offset exceeds content length",
282 ));
283 }
284
285 let size = range.size().map(|s| s as usize);
286 let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
287 let sliced_buffer = buffer.slice(offset..end);
288
289 Ok((RpRead::new(), sliced_buffer))
290 }
291 None => Err(Error::new(ErrorKind::NotFound, "path not found")),
292 }
293 }
294
295 async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
296 let abs_path = build_abs_path(&self.info.root(), path);
297 let writer = EtcdWriter::new(self.core.clone(), abs_path);
298 Ok((RpWrite::new(), writer))
299 }
300
301 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
302 let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
303 self.core.clone(),
304 self.info.root().to_string(),
305 ));
306 Ok((RpDelete::default(), deleter))
307 }
308
309 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
310 let lister = EtcdLister::new(
311 self.core.clone(),
312 self.info.root().to_string(),
313 path.to_string(),
314 )
315 .await?;
316 let lister = oio::HierarchyLister::new(lister, path, args.recursive());
317 Ok((RpList::default(), lister))
318 }
319}