opendal_core/services/redis/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use fastpool::{ManageObject, ObjectStatus, bounded};
23use redis::AsyncCommands;
24use redis::Client;
25use redis::Cmd;
26use redis::Pipeline;
27use redis::RedisError;
28use redis::RedisFuture;
29use redis::Value;
30use redis::aio::ConnectionLike;
31use redis::aio::ConnectionManager;
32use redis::cluster::ClusterClient;
33use redis::cluster_async::ClusterConnection;
34
35use crate::raw::*;
36use crate::*;
37
38#[derive(Clone)]
39pub enum RedisConnection {
40 Normal(ConnectionManager),
41 Cluster(ClusterConnection),
42}
43
44impl ConnectionLike for RedisConnection {
45 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
46 match self {
47 RedisConnection::Normal(conn) => conn.req_packed_command(cmd),
48 RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
49 }
50 }
51
52 fn req_packed_commands<'a>(
53 &'a mut self,
54 cmd: &'a Pipeline,
55 offset: usize,
56 count: usize,
57 ) -> RedisFuture<'a, Vec<Value>> {
58 match self {
59 RedisConnection::Normal(conn) => conn.req_packed_commands(cmd, offset, count),
60 RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
61 }
62 }
63
64 fn get_db(&self) -> i64 {
65 match self {
66 RedisConnection::Normal(conn) => conn.get_db(),
67 RedisConnection::Cluster(conn) => conn.get_db(),
68 }
69 }
70}
71
72#[derive(Clone)]
73pub struct RedisConnectionManager {
74 client: Option<Client>,
75 cluster_client: Option<ClusterClient>,
76}
77
78impl ManageObject for RedisConnectionManager {
79 type Object = RedisConnection;
80 type Error = Error;
81
82 async fn create(&self) -> Result<RedisConnection, Self::Error> {
83 if let Some(client) = self.client.clone() {
84 ConnectionManager::new(client.clone())
85 .await
86 .map_err(format_redis_error)
87 .map(RedisConnection::Normal)
88 } else {
89 self.cluster_client
90 .clone()
91 .unwrap()
92 .get_async_connection()
93 .await
94 .map_err(format_redis_error)
95 .map(RedisConnection::Cluster)
96 }
97 }
98
99 async fn is_recyclable(
100 &self,
101 o: &mut Self::Object,
102 _: &ObjectStatus,
103 ) -> Result<(), Self::Error> {
104 match o.ping::<String>().await {
105 Ok(ref pong) => match pong.as_bytes() {
106 b"PONG" => Ok(()),
107 _ => Err(Error::new(ErrorKind::Unexpected, "PING ERROR")),
108 },
109 Err(err) => Err(format_redis_error(err)),
110 }
111 }
112}
113
114#[derive(Clone)]
116pub struct RedisCore {
117 addr: String,
118 conn: Arc<bounded::Pool<RedisConnectionManager>>,
119 default_ttl: Option<Duration>,
120}
121
122impl Debug for RedisCore {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 f.debug_struct("RedisCore")
125 .field("addr", &self.addr)
126 .finish_non_exhaustive()
127 }
128}
129
130impl RedisCore {
131 pub fn new(
132 endpoint: String,
133 client: Option<Client>,
134 cluster_client: Option<ClusterClient>,
135 default_ttl: Option<Duration>,
136 connection_pool_max_size: Option<usize>,
137 ) -> Self {
138 let manager = RedisConnectionManager {
139 client,
140 cluster_client,
141 };
142 let pool = bounded::Pool::new(
143 bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
144 manager,
145 );
146
147 Self {
148 addr: endpoint,
149 conn: pool,
150 default_ttl,
151 }
152 }
153
154 pub fn addr(&self) -> &str {
155 &self.addr
156 }
157
158 pub async fn conn(&self) -> Result<bounded::Object<RedisConnectionManager>> {
159 let fut = self.conn.get();
160
161 tokio::select! {
162 _ = tokio::time::sleep(Duration::from_secs(10)) => {
163 Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
164 }
165 result = fut => match result {
166 Ok(conn) => Ok(conn),
167 Err(err) => Err(err),
168 }
169 }
170 }
171
172 pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
173 let mut conn = self.conn().await?;
174 let result: Option<Bytes> = conn.get(key).await.map_err(format_redis_error)?;
175 Ok(result.map(Buffer::from))
176 }
177
178 pub async fn get_range(&self, key: &str, start: isize, end: isize) -> Result<Option<Buffer>> {
179 let mut conn = self.conn().await?;
180 let result: Option<Bytes> = conn
181 .getrange(key, start, end)
182 .await
183 .map_err(format_redis_error)?;
184 Ok(result.map(Buffer::from))
185 }
186
187 pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
188 let mut conn = self.conn().await?;
189 let value = value.to_vec();
190 if let Some(dur) = self.default_ttl {
191 let _: () = conn
192 .set_ex(key, value, dur.as_secs())
193 .await
194 .map_err(format_redis_error)?;
195 } else {
196 let _: () = conn.set(key, value).await.map_err(format_redis_error)?;
197 }
198 Ok(())
199 }
200
201 pub async fn delete(&self, key: &str) -> Result<()> {
202 let mut conn = self.conn().await?;
203 let _: () = conn.del(key).await.map_err(format_redis_error)?;
204 Ok(())
205 }
206}
207
208pub fn format_redis_error(e: RedisError) -> Error {
209 Error::new(ErrorKind::Unexpected, e.category())
210 .set_source(e)
211 .set_temporary()
212}