opendal/services/memcached/
backend.rs1use std::time::Duration;
19
20use bb8::RunError;
21use tokio::net::TcpStream;
22use tokio::sync::OnceCell;
23
24use super::binary;
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MemcachedConfig;
28use crate::*;
29
30#[doc = include_str!("docs.md")]
32#[derive(Clone, Default)]
33pub struct MemcachedBuilder {
34 pub(super) config: MemcachedConfig,
35}
36
37impl MemcachedBuilder {
38 pub fn endpoint(mut self, endpoint: &str) -> Self {
42 if !endpoint.is_empty() {
43 self.config.endpoint = Some(endpoint.to_owned());
44 }
45 self
46 }
47
48 pub fn root(mut self, root: &str) -> Self {
52 self.config.root = if root.is_empty() {
53 None
54 } else {
55 Some(root.to_string())
56 };
57
58 self
59 }
60
61 pub fn username(mut self, username: &str) -> Self {
63 self.config.username = Some(username.to_string());
64 self
65 }
66
67 pub fn password(mut self, password: &str) -> Self {
69 self.config.password = Some(password.to_string());
70 self
71 }
72
73 pub fn default_ttl(mut self, ttl: Duration) -> Self {
75 self.config.default_ttl = Some(ttl);
76 self
77 }
78}
79
80impl Builder for MemcachedBuilder {
81 type Config = MemcachedConfig;
82
83 fn build(self) -> Result<impl Access> {
84 let endpoint = self.config.endpoint.clone().ok_or_else(|| {
85 Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
86 .with_context("service", Scheme::Memcached)
87 })?;
88 let uri = http::Uri::try_from(&endpoint).map_err(|err| {
89 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
90 .with_context("service", Scheme::Memcached)
91 .with_context("endpoint", &endpoint)
92 .set_source(err)
93 })?;
94
95 match uri.scheme_str() {
96 None => (),
98 Some(scheme) => {
99 if scheme != "tcp" {
101 return Err(Error::new(
102 ErrorKind::ConfigInvalid,
103 "endpoint is using invalid scheme",
104 )
105 .with_context("service", Scheme::Memcached)
106 .with_context("endpoint", &endpoint)
107 .with_context("scheme", scheme.to_string()));
108 }
109 }
110 };
111
112 let host = if let Some(host) = uri.host() {
113 host.to_string()
114 } else {
115 return Err(
116 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
117 .with_context("service", Scheme::Memcached)
118 .with_context("endpoint", &endpoint),
119 );
120 };
121 let port = if let Some(port) = uri.port_u16() {
122 port
123 } else {
124 return Err(
125 Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
126 .with_context("service", Scheme::Memcached)
127 .with_context("endpoint", &endpoint),
128 );
129 };
130 let endpoint = format!("{host}:{port}",);
131
132 let root = normalize_root(
133 self.config
134 .root
135 .clone()
136 .unwrap_or_else(|| "/".to_string())
137 .as_str(),
138 );
139
140 let conn = OnceCell::new();
141 Ok(MemcachedBackend::new(Adapter {
142 endpoint,
143 username: self.config.username.clone(),
144 password: self.config.password.clone(),
145 conn,
146 default_ttl: self.config.default_ttl,
147 })
148 .with_normalized_root(root))
149 }
150}
151
152pub type MemcachedBackend = kv::Backend<Adapter>;
154
155#[derive(Clone, Debug)]
156pub struct Adapter {
157 endpoint: String,
158 username: Option<String>,
159 password: Option<String>,
160 default_ttl: Option<Duration>,
161 conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
162}
163
164impl Adapter {
165 async fn conn(&self) -> Result<bb8::PooledConnection<'_, MemcacheConnectionManager>> {
166 let pool = self
167 .conn
168 .get_or_try_init(|| async {
169 let mgr = MemcacheConnectionManager::new(
170 &self.endpoint,
171 self.username.clone(),
172 self.password.clone(),
173 );
174
175 bb8::Pool::builder().build(mgr).await.map_err(|err| {
176 Error::new(ErrorKind::ConfigInvalid, "connect to memecached failed")
177 .set_source(err)
178 })
179 })
180 .await?;
181
182 pool.get().await.map_err(|err| match err {
183 RunError::TimedOut => {
184 Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
185 }
186 RunError::User(err) => err,
187 })
188 }
189}
190
191impl kv::Adapter for Adapter {
192 type Scanner = ();
193
194 fn info(&self) -> kv::Info {
195 kv::Info::new(
196 Scheme::Memcached,
197 "memcached",
198 Capability {
199 read: true,
200 write: true,
201 shared: true,
202
203 ..Default::default()
204 },
205 )
206 }
207
208 async fn get(&self, key: &str) -> Result<Option<Buffer>> {
209 let mut conn = self.conn().await?;
210 let result = conn.get(&percent_encode_path(key)).await?;
211 Ok(result.map(Buffer::from))
212 }
213
214 async fn set(&self, key: &str, value: Buffer) -> Result<()> {
215 let mut conn = self.conn().await?;
216
217 conn.set(
218 &percent_encode_path(key),
219 &value.to_vec(),
220 self.default_ttl
222 .map(|v| v.as_secs() as u32)
223 .unwrap_or_default(),
224 )
225 .await
226 }
227
228 async fn delete(&self, key: &str) -> Result<()> {
229 let mut conn = self.conn().await?;
230
231 conn.delete(&percent_encode_path(key)).await
232 }
233}
234
235#[derive(Clone, Debug)]
237struct MemcacheConnectionManager {
238 address: String,
239 username: Option<String>,
240 password: Option<String>,
241}
242
243impl MemcacheConnectionManager {
244 fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
245 Self {
246 address: address.to_string(),
247 username,
248 password,
249 }
250 }
251}
252
253impl bb8::ManageConnection for MemcacheConnectionManager {
254 type Connection = binary::Connection;
255 type Error = Error;
256
257 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
259 let conn = TcpStream::connect(&self.address)
260 .await
261 .map_err(new_std_io_error)?;
262 let mut conn = binary::Connection::new(conn);
263
264 if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
265 conn.auth(username, password).await?;
266 }
267 Ok(conn)
268 }
269
270 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
271 conn.version().await.map(|_| ())
272 }
273
274 fn has_broken(&self, _: &mut Self::Connection) -> bool {
275 false
276 }
277}