opendal_core/services/memcached/
backend.rs1use std::sync::Arc;
19
20use super::MEMCACHED_SCHEME;
21use super::config::MemcachedConfig;
22use super::core::*;
23use super::deleter::MemcachedDeleter;
24use super::writer::MemcachedWriter;
25use crate::raw::*;
26use crate::*;
27
28#[doc = include_str!("docs.md")]
30#[derive(Debug, Default)]
31pub struct MemcachedBuilder {
32 pub(super) config: MemcachedConfig,
33}
34
35impl MemcachedBuilder {
36 pub fn endpoint(mut self, endpoint: &str) -> Self {
40 if !endpoint.is_empty() {
41 self.config.endpoint = Some(endpoint.to_owned());
42 }
43 self
44 }
45
46 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58
59 pub fn username(mut self, username: &str) -> Self {
61 self.config.username = Some(username.to_string());
62 self
63 }
64
65 pub fn password(mut self, password: &str) -> Self {
67 self.config.password = Some(password.to_string());
68 self
69 }
70
71 pub fn default_ttl(mut self, ttl: Duration) -> Self {
73 self.config.default_ttl = Some(ttl);
74 self
75 }
76
77 #[must_use]
85 pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
86 assert!(max_size > 0, "max_size must be greater than zero!");
87 self.config.connection_pool_max_size = Some(max_size);
88 self
89 }
90}
91
92impl Builder for MemcachedBuilder {
93 type Config = MemcachedConfig;
94
95 fn build(self) -> Result<impl Access> {
96 let endpoint = self.config.endpoint.clone().ok_or_else(|| {
97 Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
98 .with_context("service", MEMCACHED_SCHEME)
99 })?;
100 let uri = http::Uri::try_from(&endpoint).map_err(|err| {
101 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
102 .with_context("service", MEMCACHED_SCHEME)
103 .with_context("endpoint", &endpoint)
104 .set_source(err)
105 })?;
106
107 match uri.scheme_str() {
108 None => (),
110 Some(scheme) => {
111 if scheme != "tcp" {
113 return Err(Error::new(
114 ErrorKind::ConfigInvalid,
115 "endpoint is using invalid scheme",
116 )
117 .with_context("service", MEMCACHED_SCHEME)
118 .with_context("endpoint", &endpoint)
119 .with_context("scheme", scheme.to_string()));
120 }
121 }
122 };
123
124 let host = if let Some(host) = uri.host() {
125 host.to_string()
126 } else {
127 return Err(
128 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
129 .with_context("service", MEMCACHED_SCHEME)
130 .with_context("endpoint", &endpoint),
131 );
132 };
133 let port = if let Some(port) = uri.port_u16() {
134 port
135 } else {
136 return Err(
137 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
138 .with_context("service", MEMCACHED_SCHEME)
139 .with_context("endpoint", &endpoint),
140 );
141 };
142 let endpoint = format!("{host}:{port}",);
143
144 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
145
146 Ok(MemcachedBackend::new(MemcachedCore::new(
147 endpoint,
148 self.config.username,
149 self.config.password,
150 self.config.default_ttl,
151 self.config.connection_pool_max_size,
152 ))
153 .with_normalized_root(root))
154 }
155}
156
157#[derive(Clone, Debug)]
159pub struct MemcachedBackend {
160 core: Arc<MemcachedCore>,
161 root: String,
162 info: Arc<AccessorInfo>,
163}
164
165impl MemcachedBackend {
166 pub fn new(core: MemcachedCore) -> Self {
167 let info = AccessorInfo::default();
168 info.set_scheme(MEMCACHED_SCHEME);
169 info.set_name("memcached");
170 info.set_root("/");
171 info.set_native_capability(Capability {
172 read: true,
173 stat: true,
174 write: true,
175 write_can_empty: true,
176 delete: true,
177 shared: true,
178 ..Default::default()
179 });
180
181 Self {
182 core: Arc::new(core),
183 root: "/".to_string(),
184 info: Arc::new(info),
185 }
186 }
187
188 fn with_normalized_root(mut self, root: String) -> Self {
189 self.info.set_root(&root);
190 self.root = root;
191 self
192 }
193}
194
195impl Access for MemcachedBackend {
196 type Reader = Buffer;
197 type Writer = MemcachedWriter;
198 type Lister = ();
199 type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
200
201 fn info(&self) -> Arc<AccessorInfo> {
202 self.info.clone()
203 }
204
205 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
206 let p = build_abs_path(&self.root, path);
207
208 if p == build_abs_path(&self.root, "") {
209 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
210 } else {
211 let bs = self.core.get(&p).await?;
212 match bs {
213 Some(bs) => Ok(RpStat::new(
214 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
215 )),
216 None => Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
217 }
218 }
219 }
220
221 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
222 let p = build_abs_path(&self.root, path);
223 let bs = match self.core.get(&p).await? {
224 Some(bs) => bs,
225 None => return Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
226 };
227 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
228 }
229
230 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
231 let p = build_abs_path(&self.root, path);
232 Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
233 }
234
235 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
236 Ok((
237 RpDelete::default(),
238 oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), self.root.clone())),
239 ))
240 }
241}