opendal/services/etcd/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use etcd_client::Certificate;
23use etcd_client::ConnectOptions;
24use etcd_client::Identity;
25use etcd_client::TlsOptions;
26use tokio::sync::OnceCell;
27
28use super::core::constants::DEFAULT_ETCD_ENDPOINTS;
29use super::core::EtcdCore;
30use super::deleter::EtcdDeleter;
31use super::lister::EtcdLister;
32use super::writer::EtcdWriter;
33use super::DEFAULT_SCHEME;
34use crate::raw::*;
35use crate::services::EtcdConfig;
36use crate::*;
37
38impl Configurator for EtcdConfig {
39 type Builder = EtcdBuilder;
40 fn into_builder(self) -> Self::Builder {
41 EtcdBuilder { config: self }
42 }
43}
44
45#[doc = include_str!("docs.md")]
47#[derive(Clone, Default)]
48pub struct EtcdBuilder {
49 config: EtcdConfig,
50}
51
52impl Debug for EtcdBuilder {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 let mut ds = f.debug_struct("Builder");
55
56 ds.field("config", &self.config);
57 ds.finish()
58 }
59}
60
61impl EtcdBuilder {
62 pub fn endpoints(mut self, endpoints: &str) -> Self {
66 if !endpoints.is_empty() {
67 self.config.endpoints = Some(endpoints.to_owned());
68 }
69 self
70 }
71
72 pub fn username(mut self, username: &str) -> Self {
76 if !username.is_empty() {
77 self.config.username = Some(username.to_owned());
78 }
79 self
80 }
81
82 pub fn password(mut self, password: &str) -> Self {
86 if !password.is_empty() {
87 self.config.password = Some(password.to_owned());
88 }
89 self
90 }
91
92 pub fn root(mut self, root: &str) -> Self {
96 self.config.root = if root.is_empty() {
97 None
98 } else {
99 Some(root.to_string())
100 };
101
102 self
103 }
104
105 pub fn ca_path(mut self, ca_path: &str) -> Self {
109 if !ca_path.is_empty() {
110 self.config.ca_path = Some(ca_path.to_string())
111 }
112 self
113 }
114
115 pub fn cert_path(mut self, cert_path: &str) -> Self {
119 if !cert_path.is_empty() {
120 self.config.cert_path = Some(cert_path.to_string())
121 }
122 self
123 }
124
125 pub fn key_path(mut self, key_path: &str) -> Self {
129 if !key_path.is_empty() {
130 self.config.key_path = Some(key_path.to_string())
131 }
132 self
133 }
134}
135
136impl Builder for EtcdBuilder {
137 type Config = EtcdConfig;
138
139 fn build(self) -> Result<impl Access> {
140 let endpoints = self
141 .config
142 .endpoints
143 .clone()
144 .unwrap_or_else(|| DEFAULT_ETCD_ENDPOINTS.to_string());
145
146 let endpoints: Vec<String> = endpoints.split(',').map(|s| s.to_string()).collect();
147
148 let mut options = ConnectOptions::new();
149
150 if self.config.ca_path.is_some()
151 && self.config.cert_path.is_some()
152 && self.config.key_path.is_some()
153 {
154 let ca = self.load_pem(self.config.ca_path.clone().unwrap().as_str())?;
155 let key = self.load_pem(self.config.key_path.clone().unwrap().as_str())?;
156 let cert = self.load_pem(self.config.cert_path.clone().unwrap().as_str())?;
157
158 let tls_options = TlsOptions::default()
159 .ca_certificate(Certificate::from_pem(ca))
160 .identity(Identity::from_pem(cert, key));
161 options = options.with_tls(tls_options);
162 }
163
164 if let Some(username) = self.config.username.clone() {
165 options = options.with_user(
166 username,
167 self.config.password.clone().unwrap_or("".to_string()),
168 );
169 }
170
171 let root = normalize_root(
172 self.config
173 .root
174 .clone()
175 .unwrap_or_else(|| "/".to_string())
176 .as_str(),
177 );
178
179 let client = OnceCell::new();
180
181 let core = EtcdCore {
182 endpoints,
183 client,
184 options,
185 };
186
187 Ok(EtcdAccessor::new(core, &root))
188 }
189}
190
191impl EtcdBuilder {
192 fn load_pem(&self, path: &str) -> Result<String> {
193 std::fs::read_to_string(path)
194 .map_err(|err| Error::new(ErrorKind::Unexpected, "invalid file path").set_source(err))
195 }
196}
197
198#[derive(Debug, Clone)]
199pub struct EtcdAccessor {
200 core: Arc<EtcdCore>,
201 info: Arc<AccessorInfo>,
202}
203
204impl EtcdAccessor {
205 fn new(core: EtcdCore, root: &str) -> Self {
206 let info = AccessorInfo::default();
207 info.set_scheme(DEFAULT_SCHEME);
208 info.set_name("etcd");
209 info.set_root(root);
210 info.set_native_capability(Capability {
211 read: true,
212
213 write: true,
214 write_can_empty: true,
215
216 delete: true,
217 stat: true,
218 list: true,
219
220 shared: true,
221
222 ..Default::default()
223 });
224
225 Self {
226 core: Arc::new(core),
227 info: Arc::new(info),
228 }
229 }
230}
231
232impl Access for EtcdAccessor {
233 type Reader = Buffer;
234 type Writer = EtcdWriter;
235 type Lister = oio::HierarchyLister<EtcdLister>;
236 type Deleter = oio::OneShotDeleter<EtcdDeleter>;
237
238 fn info(&self) -> Arc<AccessorInfo> {
239 self.info.clone()
240 }
241
242 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
243 let abs_path = build_abs_path(&self.info.root(), path);
244
245 let dir_path = if abs_path.ends_with('/') {
248 abs_path
249 } else {
250 format!("{abs_path}/")
251 };
252
253 self.core.set(&dir_path, Buffer::new()).await?;
255
256 Ok(RpCreateDir::default())
257 }
258
259 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
260 let abs_path = build_abs_path(&self.info.root(), path);
261
262 match self.core.get(&abs_path).await? {
264 Some(buffer) => {
265 let mut metadata = Metadata::new(EntryMode::from_path(&abs_path));
266 metadata.set_content_length(buffer.len() as u64);
267 Ok(RpStat::new(metadata))
268 }
269 None => {
270 let prefix = if abs_path.ends_with('/') {
272 abs_path
273 } else {
274 format!("{abs_path}/")
275 };
276
277 let has_children = self.core.has_prefix(&prefix).await?;
279 if has_children {
280 let metadata = Metadata::new(EntryMode::DIR);
282 Ok(RpStat::new(metadata))
283 } else {
284 Err(Error::new(ErrorKind::NotFound, "path not found"))
285 }
286 }
287 }
288 }
289
290 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
291 let abs_path = build_abs_path(&self.info.root(), path);
292
293 match self.core.get(&abs_path).await? {
294 Some(buffer) => {
295 let range = op.range();
296
297 if range.is_full() {
299 return Ok((RpRead::new(), buffer));
300 }
301
302 let offset = range.offset() as usize;
304 if offset >= buffer.len() {
305 return Err(Error::new(
306 ErrorKind::RangeNotSatisfied,
307 "range start offset exceeds content length",
308 ));
309 }
310
311 let size = range.size().map(|s| s as usize);
312 let end = size.map_or(buffer.len(), |s| (offset + s).min(buffer.len()));
313 let sliced_buffer = buffer.slice(offset..end);
314
315 Ok((RpRead::new(), sliced_buffer))
316 }
317 None => Err(Error::new(ErrorKind::NotFound, "path not found")),
318 }
319 }
320
321 async fn write(&self, path: &str, _op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
322 let abs_path = build_abs_path(&self.info.root(), path);
323 let writer = EtcdWriter::new(self.core.clone(), abs_path);
324 Ok((RpWrite::new(), writer))
325 }
326
327 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
328 let deleter = oio::OneShotDeleter::new(EtcdDeleter::new(
329 self.core.clone(),
330 self.info.root().to_string(),
331 ));
332 Ok((RpDelete::default(), deleter))
333 }
334
335 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
336 let lister = EtcdLister::new(
337 self.core.clone(),
338 self.info.root().to_string(),
339 path.to_string(),
340 )
341 .await?;
342 let lister = oio::HierarchyLister::new(lister, path, args.recursive());
343 Ok((RpList::default(), lister))
344 }
345}