opendal/services/memcached/
backend.rs1use std::time::Duration;
19
20use bb8::RunError;
21use tokio::net::TcpStream;
22use tokio::sync::OnceCell;
23
24use super::binary;
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MemcachedConfig;
28use crate::*;
29
30impl Configurator for MemcachedConfig {
31 type Builder = MemcachedBuilder;
32 fn into_builder(self) -> Self::Builder {
33 MemcachedBuilder { config: self }
34 }
35}
36
37#[doc = include_str!("docs.md")]
39#[derive(Clone, Default)]
40pub struct MemcachedBuilder {
41 config: MemcachedConfig,
42}
43
44impl MemcachedBuilder {
45 pub fn endpoint(mut self, endpoint: &str) -> Self {
49 if !endpoint.is_empty() {
50 self.config.endpoint = Some(endpoint.to_owned());
51 }
52 self
53 }
54
55 pub fn root(mut self, root: &str) -> Self {
59 self.config.root = if root.is_empty() {
60 None
61 } else {
62 Some(root.to_string())
63 };
64
65 self
66 }
67
68 pub fn username(mut self, username: &str) -> Self {
70 self.config.username = Some(username.to_string());
71 self
72 }
73
74 pub fn password(mut self, password: &str) -> Self {
76 self.config.password = Some(password.to_string());
77 self
78 }
79
80 pub fn default_ttl(mut self, ttl: Duration) -> Self {
82 self.config.default_ttl = Some(ttl);
83 self
84 }
85}
86
87impl Builder for MemcachedBuilder {
88 const SCHEME: Scheme = Scheme::Memcached;
89 type Config = MemcachedConfig;
90
91 fn build(self) -> Result<impl Access> {
92 let endpoint = self.config.endpoint.clone().ok_or_else(|| {
93 Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
94 .with_context("service", Scheme::Memcached)
95 })?;
96 let uri = http::Uri::try_from(&endpoint).map_err(|err| {
97 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
98 .with_context("service", Scheme::Memcached)
99 .with_context("endpoint", &endpoint)
100 .set_source(err)
101 })?;
102
103 match uri.scheme_str() {
104 None => (),
106 Some(scheme) => {
107 if scheme != "tcp" {
109 return Err(Error::new(
110 ErrorKind::ConfigInvalid,
111 "endpoint is using invalid scheme",
112 )
113 .with_context("service", Scheme::Memcached)
114 .with_context("endpoint", &endpoint)
115 .with_context("scheme", scheme.to_string()));
116 }
117 }
118 };
119
120 let host = if let Some(host) = uri.host() {
121 host.to_string()
122 } else {
123 return Err(
124 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
125 .with_context("service", Scheme::Memcached)
126 .with_context("endpoint", &endpoint),
127 );
128 };
129 let port = if let Some(port) = uri.port_u16() {
130 port
131 } else {
132 return Err(
133 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
134 .with_context("service", Scheme::Memcached)
135 .with_context("endpoint", &endpoint),
136 );
137 };
138 let endpoint = format!("{host}:{port}",);
139
140 let root = normalize_root(
141 self.config
142 .root
143 .clone()
144 .unwrap_or_else(|| "/".to_string())
145 .as_str(),
146 );
147
148 let conn = OnceCell::new();
149 Ok(MemcachedBackend::new(Adapter {
150 endpoint,
151 username: self.config.username.clone(),
152 password: self.config.password.clone(),
153 conn,
154 default_ttl: self.config.default_ttl,
155 })
156 .with_normalized_root(root))
157 }
158}
159
160pub type MemcachedBackend = kv::Backend<Adapter>;
162
163#[derive(Clone, Debug)]
164pub struct Adapter {
165 endpoint: String,
166 username: Option<String>,
167 password: Option<String>,
168 default_ttl: Option<Duration>,
169 conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
170}
171
172impl Adapter {
173 async fn conn(&self) -> Result<bb8::PooledConnection<'_, MemcacheConnectionManager>> {
174 let pool = self
175 .conn
176 .get_or_try_init(|| async {
177 let mgr = MemcacheConnectionManager::new(
178 &self.endpoint,
179 self.username.clone(),
180 self.password.clone(),
181 );
182
183 bb8::Pool::builder().build(mgr).await.map_err(|err| {
184 Error::new(ErrorKind::ConfigInvalid, "connect to memecached failed")
185 .set_source(err)
186 })
187 })
188 .await?;
189
190 pool.get().await.map_err(|err| match err {
191 RunError::TimedOut => {
192 Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
193 }
194 RunError::User(err) => err,
195 })
196 }
197}
198
199impl kv::Adapter for Adapter {
200 type Scanner = ();
201
202 fn info(&self) -> kv::Info {
203 kv::Info::new(
204 Scheme::Memcached,
205 "memcached",
206 Capability {
207 read: true,
208 write: true,
209 shared: true,
210
211 ..Default::default()
212 },
213 )
214 }
215
216 async fn get(&self, key: &str) -> Result<Option<Buffer>> {
217 let mut conn = self.conn().await?;
218 let result = conn.get(&percent_encode_path(key)).await?;
219 Ok(result.map(Buffer::from))
220 }
221
222 async fn set(&self, key: &str, value: Buffer) -> Result<()> {
223 let mut conn = self.conn().await?;
224
225 conn.set(
226 &percent_encode_path(key),
227 &value.to_vec(),
228 self.default_ttl
230 .map(|v| v.as_secs() as u32)
231 .unwrap_or_default(),
232 )
233 .await
234 }
235
236 async fn delete(&self, key: &str) -> Result<()> {
237 let mut conn = self.conn().await?;
238
239 conn.delete(&percent_encode_path(key)).await
240 }
241}
242
243#[derive(Clone, Debug)]
245struct MemcacheConnectionManager {
246 address: String,
247 username: Option<String>,
248 password: Option<String>,
249}
250
251impl MemcacheConnectionManager {
252 fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
253 Self {
254 address: address.to_string(),
255 username,
256 password,
257 }
258 }
259}
260
261#[async_trait::async_trait]
262impl bb8::ManageConnection for MemcacheConnectionManager {
263 type Connection = binary::Connection;
264 type Error = Error;
265
266 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
268 let conn = TcpStream::connect(&self.address)
269 .await
270 .map_err(new_std_io_error)?;
271 let mut conn = binary::Connection::new(conn);
272
273 if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
274 conn.auth(username, password).await?;
275 }
276 Ok(conn)
277 }
278
279 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
280 conn.version().await.map(|_| ())
281 }
282
283 fn has_broken(&self, _: &mut Self::Connection) -> bool {
284 false
285 }
286}