opendal/services/memcached/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::time::Duration;
19
20use bb8::RunError;
21use tokio::net::TcpStream;
22use tokio::sync::OnceCell;
23
24use super::binary;
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MemcachedConfig;
28use crate::*;
29
30impl Configurator for MemcachedConfig {
31    type Builder = MemcachedBuilder;
32    fn into_builder(self) -> Self::Builder {
33        MemcachedBuilder { config: self }
34    }
35}
36
37/// [Memcached](https://memcached.org/) service support.
38#[doc = include_str!("docs.md")]
39#[derive(Clone, Default)]
40pub struct MemcachedBuilder {
41    config: MemcachedConfig,
42}
43
44impl MemcachedBuilder {
45    /// set the network address of memcached service.
46    ///
47    /// For example: "tcp://localhost:11211"
48    pub fn endpoint(mut self, endpoint: &str) -> Self {
49        if !endpoint.is_empty() {
50            self.config.endpoint = Some(endpoint.to_owned());
51        }
52        self
53    }
54
55    /// set the working directory, all operations will be performed under it.
56    ///
57    /// default: "/"
58    pub fn root(mut self, root: &str) -> Self {
59        self.config.root = if root.is_empty() {
60            None
61        } else {
62            Some(root.to_string())
63        };
64
65        self
66    }
67
68    /// set the username.
69    pub fn username(mut self, username: &str) -> Self {
70        self.config.username = Some(username.to_string());
71        self
72    }
73
74    /// set the password.
75    pub fn password(mut self, password: &str) -> Self {
76        self.config.password = Some(password.to_string());
77        self
78    }
79
80    /// Set the default ttl for memcached services.
81    pub fn default_ttl(mut self, ttl: Duration) -> Self {
82        self.config.default_ttl = Some(ttl);
83        self
84    }
85}
86
87impl Builder for MemcachedBuilder {
88    const SCHEME: Scheme = Scheme::Memcached;
89    type Config = MemcachedConfig;
90
91    fn build(self) -> Result<impl Access> {
92        let endpoint = self.config.endpoint.clone().ok_or_else(|| {
93            Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
94                .with_context("service", Scheme::Memcached)
95        })?;
96        let uri = http::Uri::try_from(&endpoint).map_err(|err| {
97            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
98                .with_context("service", Scheme::Memcached)
99                .with_context("endpoint", &endpoint)
100                .set_source(err)
101        })?;
102
103        match uri.scheme_str() {
104            // If scheme is none, we will use tcp by default.
105            None => (),
106            Some(scheme) => {
107                // We only support tcp by now.
108                if scheme != "tcp" {
109                    return Err(Error::new(
110                        ErrorKind::ConfigInvalid,
111                        "endpoint is using invalid scheme",
112                    )
113                    .with_context("service", Scheme::Memcached)
114                    .with_context("endpoint", &endpoint)
115                    .with_context("scheme", scheme.to_string()));
116                }
117            }
118        };
119
120        let host = if let Some(host) = uri.host() {
121            host.to_string()
122        } else {
123            return Err(
124                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
125                    .with_context("service", Scheme::Memcached)
126                    .with_context("endpoint", &endpoint),
127            );
128        };
129        let port = if let Some(port) = uri.port_u16() {
130            port
131        } else {
132            return Err(
133                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
134                    .with_context("service", Scheme::Memcached)
135                    .with_context("endpoint", &endpoint),
136            );
137        };
138        let endpoint = format!("{host}:{port}",);
139
140        let root = normalize_root(
141            self.config
142                .root
143                .clone()
144                .unwrap_or_else(|| "/".to_string())
145                .as_str(),
146        );
147
148        let conn = OnceCell::new();
149        Ok(MemcachedBackend::new(Adapter {
150            endpoint,
151            username: self.config.username.clone(),
152            password: self.config.password.clone(),
153            conn,
154            default_ttl: self.config.default_ttl,
155        })
156        .with_normalized_root(root))
157    }
158}
159
160/// Backend for memcached services.
161pub type MemcachedBackend = kv::Backend<Adapter>;
162
163#[derive(Clone, Debug)]
164pub struct Adapter {
165    endpoint: String,
166    username: Option<String>,
167    password: Option<String>,
168    default_ttl: Option<Duration>,
169    conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
170}
171
172impl Adapter {
173    async fn conn(&self) -> Result<bb8::PooledConnection<'_, MemcacheConnectionManager>> {
174        let pool = self
175            .conn
176            .get_or_try_init(|| async {
177                let mgr = MemcacheConnectionManager::new(
178                    &self.endpoint,
179                    self.username.clone(),
180                    self.password.clone(),
181                );
182
183                bb8::Pool::builder().build(mgr).await.map_err(|err| {
184                    Error::new(ErrorKind::ConfigInvalid, "connect to memecached failed")
185                        .set_source(err)
186                })
187            })
188            .await?;
189
190        pool.get().await.map_err(|err| match err {
191            RunError::TimedOut => {
192                Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
193            }
194            RunError::User(err) => err,
195        })
196    }
197}
198
199impl kv::Adapter for Adapter {
200    type Scanner = ();
201
202    fn info(&self) -> kv::Info {
203        kv::Info::new(
204            Scheme::Memcached,
205            "memcached",
206            Capability {
207                read: true,
208                write: true,
209                shared: true,
210
211                ..Default::default()
212            },
213        )
214    }
215
216    async fn get(&self, key: &str) -> Result<Option<Buffer>> {
217        let mut conn = self.conn().await?;
218        let result = conn.get(&percent_encode_path(key)).await?;
219        Ok(result.map(Buffer::from))
220    }
221
222    async fn set(&self, key: &str, value: Buffer) -> Result<()> {
223        let mut conn = self.conn().await?;
224
225        conn.set(
226            &percent_encode_path(key),
227            &value.to_vec(),
228            // Set expiration to 0 if ttl not set.
229            self.default_ttl
230                .map(|v| v.as_secs() as u32)
231                .unwrap_or_default(),
232        )
233        .await
234    }
235
236    async fn delete(&self, key: &str) -> Result<()> {
237        let mut conn = self.conn().await?;
238
239        conn.delete(&percent_encode_path(key)).await
240    }
241}
242
243/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
244#[derive(Clone, Debug)]
245struct MemcacheConnectionManager {
246    address: String,
247    username: Option<String>,
248    password: Option<String>,
249}
250
251impl MemcacheConnectionManager {
252    fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
253        Self {
254            address: address.to_string(),
255            username,
256            password,
257        }
258    }
259}
260
261#[async_trait::async_trait]
262impl bb8::ManageConnection for MemcacheConnectionManager {
263    type Connection = binary::Connection;
264    type Error = Error;
265
266    /// TODO: Implement unix stream support.
267    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
268        let conn = TcpStream::connect(&self.address)
269            .await
270            .map_err(new_std_io_error)?;
271        let mut conn = binary::Connection::new(conn);
272
273        if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
274            conn.auth(username, password).await?;
275        }
276        Ok(conn)
277    }
278
279    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
280        conn.version().await.map(|_| ())
281    }
282
283    fn has_broken(&self, _: &mut Self::Connection) -> bool {
284        false
285    }
286}