opendal/services/memcached/
backend.rs1use std::sync::Arc;
19use std::time::Duration;
20
21use tokio::sync::OnceCell;
22
23use super::MEMCACHED_SCHEME;
24use super::config::MemcachedConfig;
25use super::core::*;
26use super::deleter::MemcachedDeleter;
27use super::writer::MemcachedWriter;
28use crate::raw::*;
29use crate::*;
30
31#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct MemcachedBuilder {
35 pub(super) config: MemcachedConfig,
36}
37
38impl MemcachedBuilder {
39 pub fn endpoint(mut self, endpoint: &str) -> Self {
43 if !endpoint.is_empty() {
44 self.config.endpoint = Some(endpoint.to_owned());
45 }
46 self
47 }
48
49 pub fn root(mut self, root: &str) -> Self {
53 self.config.root = if root.is_empty() {
54 None
55 } else {
56 Some(root.to_string())
57 };
58
59 self
60 }
61
62 pub fn username(mut self, username: &str) -> Self {
64 self.config.username = Some(username.to_string());
65 self
66 }
67
68 pub fn password(mut self, password: &str) -> Self {
70 self.config.password = Some(password.to_string());
71 self
72 }
73
74 pub fn default_ttl(mut self, ttl: Duration) -> Self {
76 self.config.default_ttl = Some(ttl);
77 self
78 }
79
80 #[must_use]
88 pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
89 assert!(max_size > 0, "max_size must be greater than zero!");
90 self.config.connection_pool_max_size = Some(max_size);
91 self
92 }
93}
94
95impl Builder for MemcachedBuilder {
96 type Config = MemcachedConfig;
97
98 fn build(self) -> Result<impl Access> {
99 let endpoint = self.config.endpoint.clone().ok_or_else(|| {
100 Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
101 .with_context("service", MEMCACHED_SCHEME)
102 })?;
103 let uri = http::Uri::try_from(&endpoint).map_err(|err| {
104 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
105 .with_context("service", MEMCACHED_SCHEME)
106 .with_context("endpoint", &endpoint)
107 .set_source(err)
108 })?;
109
110 match uri.scheme_str() {
111 None => (),
113 Some(scheme) => {
114 if scheme != "tcp" {
116 return Err(Error::new(
117 ErrorKind::ConfigInvalid,
118 "endpoint is using invalid scheme",
119 )
120 .with_context("service", MEMCACHED_SCHEME)
121 .with_context("endpoint", &endpoint)
122 .with_context("scheme", scheme.to_string()));
123 }
124 }
125 };
126
127 let host = if let Some(host) = uri.host() {
128 host.to_string()
129 } else {
130 return Err(
131 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
132 .with_context("service", MEMCACHED_SCHEME)
133 .with_context("endpoint", &endpoint),
134 );
135 };
136 let port = if let Some(port) = uri.port_u16() {
137 port
138 } else {
139 return Err(
140 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
141 .with_context("service", MEMCACHED_SCHEME)
142 .with_context("endpoint", &endpoint),
143 );
144 };
145 let endpoint = format!("{host}:{port}",);
146
147 let root = normalize_root(
148 self.config
149 .root
150 .clone()
151 .unwrap_or_else(|| "/".to_string())
152 .as_str(),
153 );
154
155 let conn = OnceCell::new();
156 Ok(MemcachedBackend::new(MemcachedCore {
157 conn,
158 endpoint,
159 username: self.config.username.clone(),
160 password: self.config.password.clone(),
161 default_ttl: self.config.default_ttl,
162 connection_pool_max_size: self.config.connection_pool_max_size,
163 })
164 .with_normalized_root(root))
165 }
166}
167
168#[derive(Clone, Debug)]
170pub struct MemcachedBackend {
171 core: Arc<MemcachedCore>,
172 root: String,
173 info: Arc<AccessorInfo>,
174}
175
176impl MemcachedBackend {
177 pub fn new(core: MemcachedCore) -> Self {
178 let info = AccessorInfo::default();
179 info.set_scheme(MEMCACHED_SCHEME);
180 info.set_name("memcached");
181 info.set_root("/");
182 info.set_native_capability(Capability {
183 read: true,
184 stat: true,
185 write: true,
186 write_can_empty: true,
187 delete: true,
188 shared: true,
189 ..Default::default()
190 });
191
192 Self {
193 core: Arc::new(core),
194 root: "/".to_string(),
195 info: Arc::new(info),
196 }
197 }
198
199 fn with_normalized_root(mut self, root: String) -> Self {
200 self.info.set_root(&root);
201 self.root = root;
202 self
203 }
204}
205
206impl Access for MemcachedBackend {
207 type Reader = Buffer;
208 type Writer = MemcachedWriter;
209 type Lister = ();
210 type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
211
212 fn info(&self) -> Arc<AccessorInfo> {
213 self.info.clone()
214 }
215
216 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
217 let p = build_abs_path(&self.root, path);
218
219 if p == build_abs_path(&self.root, "") {
220 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
221 } else {
222 let bs = self.core.get(&p).await?;
223 match bs {
224 Some(bs) => Ok(RpStat::new(
225 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
226 )),
227 None => Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
228 }
229 }
230 }
231
232 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
233 let p = build_abs_path(&self.root, path);
234 let bs = match self.core.get(&p).await? {
235 Some(bs) => bs,
236 None => return Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
237 };
238 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
239 }
240
241 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
242 let p = build_abs_path(&self.root, path);
243 Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
244 }
245
246 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
247 Ok((
248 RpDelete::default(),
249 oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), self.root.clone())),
250 ))
251 }
252}