opendal_core/services/memcached/
core.rs1use fastpool::{ManageObject, ObjectStatus, bounded};
19use std::sync::Arc;
20use tokio::net::TcpStream;
21
22use super::binary;
23use crate::raw::*;
24use crate::*;
25
26#[derive(Clone)]
28struct MemcacheConnectionManager {
29 address: String,
30 username: Option<String>,
31 password: Option<String>,
32}
33
34impl MemcacheConnectionManager {
35 fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
36 Self {
37 address: address.to_string(),
38 username,
39 password,
40 }
41 }
42}
43
44impl ManageObject for MemcacheConnectionManager {
45 type Object = binary::Connection;
46 type Error = Error;
47
48 async fn create(&self) -> Result<Self::Object, Self::Error> {
50 let conn = TcpStream::connect(&self.address)
51 .await
52 .map_err(new_std_io_error)?;
53 let mut conn = binary::Connection::new(conn);
54
55 if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
56 conn.auth(username, password).await?;
57 }
58 Ok(conn)
59 }
60
61 async fn is_recyclable(
62 &self,
63 o: &mut Self::Object,
64 _: &ObjectStatus,
65 ) -> Result<(), Self::Error> {
66 match o.version().await {
67 Ok(_) => Ok(()),
68 Err(err) => Err(err),
69 }
70 }
71}
72
73#[derive(Clone, Debug)]
74pub struct MemcachedCore {
75 default_ttl: Option<Duration>,
76 conn: Arc<bounded::Pool<MemcacheConnectionManager>>,
77}
78
79impl MemcachedCore {
80 pub fn new(
81 endpoint: String,
82 username: Option<String>,
83 password: Option<String>,
84 default_ttl: Option<Duration>,
85 connection_pool_max_size: Option<usize>,
86 ) -> Self {
87 let conn = bounded::Pool::new(
88 bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
89 MemcacheConnectionManager::new(endpoint.as_str(), username, password),
90 );
91
92 Self { default_ttl, conn }
93 }
94
95 async fn conn(&self) -> Result<bounded::Object<MemcacheConnectionManager>> {
96 let fut = self.conn.get();
97
98 tokio::select! {
99 _ = tokio::time::sleep(Duration::from_secs(10)) => {
100 Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
101 }
102 result = fut => match result {
103 Ok(conn) => Ok(conn),
104 Err(err) => Err(err),
105 }
106 }
107 }
108
109 pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
110 let mut conn = self.conn().await?;
111 let result = conn.get(&percent_encode_path(key)).await?;
112 Ok(result.map(Buffer::from))
113 }
114
115 pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
116 let mut conn = self.conn().await?;
117
118 conn.set(
119 &percent_encode_path(key),
120 &value.to_vec(),
121 self.default_ttl
123 .map(|v| v.as_secs() as u32)
124 .unwrap_or_default(),
125 )
126 .await
127 }
128
129 pub async fn delete(&self, key: &str) -> Result<()> {
130 let mut conn = self.conn().await?;
131
132 conn.delete(&percent_encode_path(key)).await
133 }
134}