opendal_core/services/memcached/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use fastpool::{ManageObject, ObjectStatus, bounded};
19use std::sync::Arc;
20use tokio::net::TcpStream;
21
22use super::binary;
23use crate::raw::*;
24use crate::*;
25
26/// A connection manager for `memcache_async::ascii::Protocol`.
27#[derive(Clone)]
28struct MemcacheConnectionManager {
29    address: String,
30    username: Option<String>,
31    password: Option<String>,
32}
33
34impl MemcacheConnectionManager {
35    fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
36        Self {
37            address: address.to_string(),
38            username,
39            password,
40        }
41    }
42}
43
44impl ManageObject for MemcacheConnectionManager {
45    type Object = binary::Connection;
46    type Error = Error;
47
48    /// TODO: Implement unix stream support.
49    async fn create(&self) -> Result<Self::Object, Self::Error> {
50        let conn = TcpStream::connect(&self.address)
51            .await
52            .map_err(new_std_io_error)?;
53        let mut conn = binary::Connection::new(conn);
54
55        if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
56            conn.auth(username, password).await?;
57        }
58        Ok(conn)
59    }
60
61    async fn is_recyclable(
62        &self,
63        o: &mut Self::Object,
64        _: &ObjectStatus,
65    ) -> Result<(), Self::Error> {
66        match o.version().await {
67            Ok(_) => Ok(()),
68            Err(err) => Err(err),
69        }
70    }
71}
72
73#[derive(Clone, Debug)]
74pub struct MemcachedCore {
75    default_ttl: Option<Duration>,
76    conn: Arc<bounded::Pool<MemcacheConnectionManager>>,
77}
78
79impl MemcachedCore {
80    pub fn new(
81        endpoint: String,
82        username: Option<String>,
83        password: Option<String>,
84        default_ttl: Option<Duration>,
85        connection_pool_max_size: Option<usize>,
86    ) -> Self {
87        let conn = bounded::Pool::new(
88            bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
89            MemcacheConnectionManager::new(endpoint.as_str(), username, password),
90        );
91
92        Self { default_ttl, conn }
93    }
94
95    async fn conn(&self) -> Result<bounded::Object<MemcacheConnectionManager>> {
96        let fut = self.conn.get();
97
98        tokio::select! {
99            _ = tokio::time::sleep(Duration::from_secs(10)) => {
100                Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
101            }
102            result = fut => match result {
103                Ok(conn) => Ok(conn),
104                Err(err) => Err(err),
105            }
106        }
107    }
108
109    pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
110        let mut conn = self.conn().await?;
111        let result = conn.get(&percent_encode_path(key)).await?;
112        Ok(result.map(Buffer::from))
113    }
114
115    pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
116        let mut conn = self.conn().await?;
117
118        conn.set(
119            &percent_encode_path(key),
120            &value.to_vec(),
121            // Set expiration to 0 if ttl not set.
122            self.default_ttl
123                .map(|v| v.as_secs() as u32)
124                .unwrap_or_default(),
125        )
126        .await
127    }
128
129    pub async fn delete(&self, key: &str) -> Result<()> {
130        let mut conn = self.conn().await?;
131
132        conn.delete(&percent_encode_path(key)).await
133    }
134}