opendal/services/memcached/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::time::Duration;
19
20use bb8::RunError;
21use tokio::net::TcpStream;
22use tokio::sync::OnceCell;
23
24use super::binary;
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MemcachedConfig;
28use crate::*;
29
30/// [Memcached](https://memcached.org/) service support.
31#[doc = include_str!("docs.md")]
32#[derive(Clone, Default)]
33pub struct MemcachedBuilder {
34    pub(super) config: MemcachedConfig,
35}
36
37impl MemcachedBuilder {
38    /// set the network address of memcached service.
39    ///
40    /// For example: "tcp://localhost:11211"
41    pub fn endpoint(mut self, endpoint: &str) -> Self {
42        if !endpoint.is_empty() {
43            self.config.endpoint = Some(endpoint.to_owned());
44        }
45        self
46    }
47
48    /// set the working directory, all operations will be performed under it.
49    ///
50    /// default: "/"
51    pub fn root(mut self, root: &str) -> Self {
52        self.config.root = if root.is_empty() {
53            None
54        } else {
55            Some(root.to_string())
56        };
57
58        self
59    }
60
61    /// set the username.
62    pub fn username(mut self, username: &str) -> Self {
63        self.config.username = Some(username.to_string());
64        self
65    }
66
67    /// set the password.
68    pub fn password(mut self, password: &str) -> Self {
69        self.config.password = Some(password.to_string());
70        self
71    }
72
73    /// Set the default ttl for memcached services.
74    pub fn default_ttl(mut self, ttl: Duration) -> Self {
75        self.config.default_ttl = Some(ttl);
76        self
77    }
78}
79
80impl Builder for MemcachedBuilder {
81    type Config = MemcachedConfig;
82
83    fn build(self) -> Result<impl Access> {
84        let endpoint = self.config.endpoint.clone().ok_or_else(|| {
85            Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
86                .with_context("service", Scheme::Memcached)
87        })?;
88        let uri = http::Uri::try_from(&endpoint).map_err(|err| {
89            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
90                .with_context("service", Scheme::Memcached)
91                .with_context("endpoint", &endpoint)
92                .set_source(err)
93        })?;
94
95        match uri.scheme_str() {
96            // If scheme is none, we will use tcp by default.
97            None => (),
98            Some(scheme) => {
99                // We only support tcp by now.
100                if scheme != "tcp" {
101                    return Err(Error::new(
102                        ErrorKind::ConfigInvalid,
103                        "endpoint is using invalid scheme",
104                    )
105                    .with_context("service", Scheme::Memcached)
106                    .with_context("endpoint", &endpoint)
107                    .with_context("scheme", scheme.to_string()));
108                }
109            }
110        };
111
112        let host = if let Some(host) = uri.host() {
113            host.to_string()
114        } else {
115            return Err(
116                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
117                    .with_context("service", Scheme::Memcached)
118                    .with_context("endpoint", &endpoint),
119            );
120        };
121        let port = if let Some(port) = uri.port_u16() {
122            port
123        } else {
124            return Err(
125                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
126                    .with_context("service", Scheme::Memcached)
127                    .with_context("endpoint", &endpoint),
128            );
129        };
130        let endpoint = format!("{host}:{port}",);
131
132        let root = normalize_root(
133            self.config
134                .root
135                .clone()
136                .unwrap_or_else(|| "/".to_string())
137                .as_str(),
138        );
139
140        let conn = OnceCell::new();
141        Ok(MemcachedBackend::new(Adapter {
142            endpoint,
143            username: self.config.username.clone(),
144            password: self.config.password.clone(),
145            conn,
146            default_ttl: self.config.default_ttl,
147        })
148        .with_normalized_root(root))
149    }
150}
151
152/// Backend for memcached services.
153pub type MemcachedBackend = kv::Backend<Adapter>;
154
155#[derive(Clone, Debug)]
156pub struct Adapter {
157    endpoint: String,
158    username: Option<String>,
159    password: Option<String>,
160    default_ttl: Option<Duration>,
161    conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
162}
163
164impl Adapter {
165    async fn conn(&self) -> Result<bb8::PooledConnection<'_, MemcacheConnectionManager>> {
166        let pool = self
167            .conn
168            .get_or_try_init(|| async {
169                let mgr = MemcacheConnectionManager::new(
170                    &self.endpoint,
171                    self.username.clone(),
172                    self.password.clone(),
173                );
174
175                bb8::Pool::builder().build(mgr).await.map_err(|err| {
176                    Error::new(ErrorKind::ConfigInvalid, "connect to memecached failed")
177                        .set_source(err)
178                })
179            })
180            .await?;
181
182        pool.get().await.map_err(|err| match err {
183            RunError::TimedOut => {
184                Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
185            }
186            RunError::User(err) => err,
187        })
188    }
189}
190
191impl kv::Adapter for Adapter {
192    type Scanner = ();
193
194    fn info(&self) -> kv::Info {
195        kv::Info::new(
196            Scheme::Memcached,
197            "memcached",
198            Capability {
199                read: true,
200                write: true,
201                shared: true,
202
203                ..Default::default()
204            },
205        )
206    }
207
208    async fn get(&self, key: &str) -> Result<Option<Buffer>> {
209        let mut conn = self.conn().await?;
210        let result = conn.get(&percent_encode_path(key)).await?;
211        Ok(result.map(Buffer::from))
212    }
213
214    async fn set(&self, key: &str, value: Buffer) -> Result<()> {
215        let mut conn = self.conn().await?;
216
217        conn.set(
218            &percent_encode_path(key),
219            &value.to_vec(),
220            // Set expiration to 0 if ttl not set.
221            self.default_ttl
222                .map(|v| v.as_secs() as u32)
223                .unwrap_or_default(),
224        )
225        .await
226    }
227
228    async fn delete(&self, key: &str) -> Result<()> {
229        let mut conn = self.conn().await?;
230
231        conn.delete(&percent_encode_path(key)).await
232    }
233}
234
235/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
236#[derive(Clone, Debug)]
237struct MemcacheConnectionManager {
238    address: String,
239    username: Option<String>,
240    password: Option<String>,
241}
242
243impl MemcacheConnectionManager {
244    fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
245        Self {
246            address: address.to_string(),
247            username,
248            password,
249        }
250    }
251}
252
253impl bb8::ManageConnection for MemcacheConnectionManager {
254    type Connection = binary::Connection;
255    type Error = Error;
256
257    /// TODO: Implement unix stream support.
258    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
259        let conn = TcpStream::connect(&self.address)
260            .await
261            .map_err(new_std_io_error)?;
262        let mut conn = binary::Connection::new(conn);
263
264        if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
265            conn.auth(username, password).await?;
266        }
267        Ok(conn)
268    }
269
270    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
271        conn.version().await.map(|_| ())
272    }
273
274    fn has_broken(&self, _: &mut Self::Connection) -> bool {
275        false
276    }
277}