opendal/services/redis/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::time::Duration;
21
22use bb8::RunError;
23use bytes::Bytes;
24use redis::aio::ConnectionLike;
25use redis::aio::ConnectionManager;
26use redis::cluster::ClusterClient;
27use redis::cluster_async::ClusterConnection;
28use redis::AsyncCommands;
29use redis::Client;
30use redis::Cmd;
31use redis::Pipeline;
32use redis::RedisError;
33use redis::RedisFuture;
34use redis::Value;
35use tokio::sync::OnceCell;
36
37use crate::*;
38
39#[derive(Clone)]
40pub enum RedisConnection {
41 Normal(ConnectionManager),
42 Cluster(ClusterConnection),
43}
44
45impl ConnectionLike for RedisConnection {
46 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
47 match self {
48 RedisConnection::Normal(conn) => conn.req_packed_command(cmd),
49 RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
50 }
51 }
52
53 fn req_packed_commands<'a>(
54 &'a mut self,
55 cmd: &'a Pipeline,
56 offset: usize,
57 count: usize,
58 ) -> RedisFuture<'a, Vec<Value>> {
59 match self {
60 RedisConnection::Normal(conn) => conn.req_packed_commands(cmd, offset, count),
61 RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
62 }
63 }
64
65 fn get_db(&self) -> i64 {
66 match self {
67 RedisConnection::Normal(conn) => conn.get_db(),
68 RedisConnection::Cluster(conn) => conn.get_db(),
69 }
70 }
71}
72
73#[derive(Clone)]
74pub struct RedisConnectionManager {
75 pub client: Option<Client>,
76 pub cluster_client: Option<ClusterClient>,
77}
78
79impl bb8::ManageConnection for RedisConnectionManager {
80 type Connection = RedisConnection;
81 type Error = Error;
82
83 async fn connect(&self) -> Result<RedisConnection, Self::Error> {
84 if let Some(client) = self.client.clone() {
85 ConnectionManager::new(client.clone())
86 .await
87 .map_err(format_redis_error)
88 .map(RedisConnection::Normal)
89 } else {
90 self.cluster_client
91 .clone()
92 .unwrap()
93 .get_async_connection()
94 .await
95 .map_err(format_redis_error)
96 .map(RedisConnection::Cluster)
97 }
98 }
99
100 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
101 let pong: String = conn.ping().await.map_err(format_redis_error)?;
102
103 if pong == "PONG" {
104 Ok(())
105 } else {
106 Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
107 }
108 }
109
110 fn has_broken(&self, _: &mut Self::Connection) -> bool {
111 false
112 }
113}
114
115#[derive(Clone)]
117pub struct RedisCore {
118 pub addr: String,
119 pub client: Option<Client>,
120 pub cluster_client: Option<ClusterClient>,
121 pub conn: OnceCell<bb8::Pool<RedisConnectionManager>>,
122 pub default_ttl: Option<Duration>,
123}
124
125impl Debug for RedisCore {
126 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127 let mut ds = f.debug_struct("RedisCore");
128 ds.field("addr", &self.addr);
129 ds.finish()
130 }
131}
132
133impl RedisCore {
134 pub async fn conn(&self) -> Result<bb8::PooledConnection<'_, RedisConnectionManager>> {
135 let pool = self
136 .conn
137 .get_or_try_init(|| async {
138 bb8::Pool::builder()
139 .build(self.get_redis_connection_manager())
140 .await
141 .map_err(|err| {
142 Error::new(ErrorKind::ConfigInvalid, "connect to redis failed")
143 .set_source(err)
144 })
145 })
146 .await?;
147 pool.get().await.map_err(|err| match err {
148 RunError::TimedOut => {
149 Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
150 }
151 RunError::User(err) => err,
152 })
153 }
154
155 pub fn get_redis_connection_manager(&self) -> RedisConnectionManager {
156 if let Some(_client) = self.client.clone() {
157 RedisConnectionManager {
158 client: self.client.clone(),
159 cluster_client: None,
160 }
161 } else {
162 RedisConnectionManager {
163 client: None,
164 cluster_client: self.cluster_client.clone(),
165 }
166 }
167 }
168
169 pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
170 let mut conn = self.conn().await?;
171 let result: Option<Bytes> = conn.get(key).await.map_err(format_redis_error)?;
172 Ok(result.map(Buffer::from))
173 }
174
175 pub async fn get_range(&self, key: &str, start: isize, end: isize) -> Result<Option<Buffer>> {
176 let mut conn = self.conn().await?;
177 let result: Option<Bytes> = conn
178 .getrange(key, start, end)
179 .await
180 .map_err(format_redis_error)?;
181 Ok(result.map(Buffer::from))
182 }
183
184 pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
185 let mut conn = self.conn().await?;
186 let value = value.to_vec();
187 if let Some(dur) = self.default_ttl {
188 let _: () = conn
189 .set_ex(key, value, dur.as_secs())
190 .await
191 .map_err(format_redis_error)?;
192 } else {
193 let _: () = conn.set(key, value).await.map_err(format_redis_error)?;
194 }
195 Ok(())
196 }
197
198 pub async fn delete(&self, key: &str) -> Result<()> {
199 let mut conn = self.conn().await?;
200 let _: () = conn.del(key).await.map_err(format_redis_error)?;
201 Ok(())
202 }
203}
204
205pub fn format_redis_error(e: RedisError) -> Error {
206 Error::new(ErrorKind::Unexpected, e.category())
207 .set_source(e)
208 .set_temporary()
209}