opendal/services/memcached/
backend.rs1use std::sync::Arc;
19use std::time::Duration;
20
21use super::MEMCACHED_SCHEME;
22use super::config::MemcachedConfig;
23use super::core::*;
24use super::deleter::MemcachedDeleter;
25use super::writer::MemcachedWriter;
26use crate::raw::*;
27use crate::*;
28
29#[doc = include_str!("docs.md")]
31#[derive(Debug, Default)]
32pub struct MemcachedBuilder {
33 pub(super) config: MemcachedConfig,
34}
35
36impl MemcachedBuilder {
37 pub fn endpoint(mut self, endpoint: &str) -> Self {
41 if !endpoint.is_empty() {
42 self.config.endpoint = Some(endpoint.to_owned());
43 }
44 self
45 }
46
47 pub fn root(mut self, root: &str) -> Self {
51 self.config.root = if root.is_empty() {
52 None
53 } else {
54 Some(root.to_string())
55 };
56
57 self
58 }
59
60 pub fn username(mut self, username: &str) -> Self {
62 self.config.username = Some(username.to_string());
63 self
64 }
65
66 pub fn password(mut self, password: &str) -> Self {
68 self.config.password = Some(password.to_string());
69 self
70 }
71
72 pub fn default_ttl(mut self, ttl: Duration) -> Self {
74 self.config.default_ttl = Some(ttl);
75 self
76 }
77
78 #[must_use]
86 pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
87 assert!(max_size > 0, "max_size must be greater than zero!");
88 self.config.connection_pool_max_size = Some(max_size);
89 self
90 }
91}
92
93impl Builder for MemcachedBuilder {
94 type Config = MemcachedConfig;
95
96 fn build(self) -> Result<impl Access> {
97 let endpoint = self.config.endpoint.clone().ok_or_else(|| {
98 Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
99 .with_context("service", MEMCACHED_SCHEME)
100 })?;
101 let uri = http::Uri::try_from(&endpoint).map_err(|err| {
102 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
103 .with_context("service", MEMCACHED_SCHEME)
104 .with_context("endpoint", &endpoint)
105 .set_source(err)
106 })?;
107
108 match uri.scheme_str() {
109 None => (),
111 Some(scheme) => {
112 if scheme != "tcp" {
114 return Err(Error::new(
115 ErrorKind::ConfigInvalid,
116 "endpoint is using invalid scheme",
117 )
118 .with_context("service", MEMCACHED_SCHEME)
119 .with_context("endpoint", &endpoint)
120 .with_context("scheme", scheme.to_string()));
121 }
122 }
123 };
124
125 let host = if let Some(host) = uri.host() {
126 host.to_string()
127 } else {
128 return Err(
129 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
130 .with_context("service", MEMCACHED_SCHEME)
131 .with_context("endpoint", &endpoint),
132 );
133 };
134 let port = if let Some(port) = uri.port_u16() {
135 port
136 } else {
137 return Err(
138 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
139 .with_context("service", MEMCACHED_SCHEME)
140 .with_context("endpoint", &endpoint),
141 );
142 };
143 let endpoint = format!("{host}:{port}",);
144
145 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
146
147 Ok(MemcachedBackend::new(MemcachedCore::new(
148 endpoint,
149 self.config.username,
150 self.config.password,
151 self.config.default_ttl,
152 self.config.connection_pool_max_size,
153 ))
154 .with_normalized_root(root))
155 }
156}
157
158#[derive(Clone, Debug)]
160pub struct MemcachedBackend {
161 core: Arc<MemcachedCore>,
162 root: String,
163 info: Arc<AccessorInfo>,
164}
165
166impl MemcachedBackend {
167 pub fn new(core: MemcachedCore) -> Self {
168 let info = AccessorInfo::default();
169 info.set_scheme(MEMCACHED_SCHEME);
170 info.set_name("memcached");
171 info.set_root("/");
172 info.set_native_capability(Capability {
173 read: true,
174 stat: true,
175 write: true,
176 write_can_empty: true,
177 delete: true,
178 shared: true,
179 ..Default::default()
180 });
181
182 Self {
183 core: Arc::new(core),
184 root: "/".to_string(),
185 info: Arc::new(info),
186 }
187 }
188
189 fn with_normalized_root(mut self, root: String) -> Self {
190 self.info.set_root(&root);
191 self.root = root;
192 self
193 }
194}
195
196impl Access for MemcachedBackend {
197 type Reader = Buffer;
198 type Writer = MemcachedWriter;
199 type Lister = ();
200 type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
201
202 fn info(&self) -> Arc<AccessorInfo> {
203 self.info.clone()
204 }
205
206 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
207 let p = build_abs_path(&self.root, path);
208
209 if p == build_abs_path(&self.root, "") {
210 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
211 } else {
212 let bs = self.core.get(&p).await?;
213 match bs {
214 Some(bs) => Ok(RpStat::new(
215 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
216 )),
217 None => Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
218 }
219 }
220 }
221
222 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
223 let p = build_abs_path(&self.root, path);
224 let bs = match self.core.get(&p).await? {
225 Some(bs) => bs,
226 None => return Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
227 };
228 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
229 }
230
231 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
232 let p = build_abs_path(&self.root, path);
233 Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
234 }
235
236 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
237 Ok((
238 RpDelete::default(),
239 oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), self.root.clone())),
240 ))
241 }
242}