opendal/services/redis/
core.rs1use crate::Error;
19use crate::ErrorKind;
20
21use redis::aio::ConnectionLike;
22use redis::aio::ConnectionManager;
23use redis::cluster::ClusterClient;
24use redis::cluster_async::ClusterConnection;
25use redis::AsyncCommands;
26use redis::Client;
27use redis::RedisError;
28use redis::{Cmd, Pipeline, RedisFuture, Value};
29
30#[derive(Clone)]
31pub enum RedisConnection {
32 Normal(ConnectionManager),
33 Cluster(ClusterConnection),
34}
35
36impl ConnectionLike for RedisConnection {
37 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
38 match self {
39 RedisConnection::Normal(conn) => conn.req_packed_command(cmd),
40 RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
41 }
42 }
43
44 fn req_packed_commands<'a>(
45 &'a mut self,
46 cmd: &'a Pipeline,
47 offset: usize,
48 count: usize,
49 ) -> RedisFuture<'a, Vec<Value>> {
50 match self {
51 RedisConnection::Normal(conn) => conn.req_packed_commands(cmd, offset, count),
52 RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
53 }
54 }
55
56 fn get_db(&self) -> i64 {
57 match self {
58 RedisConnection::Normal(conn) => conn.get_db(),
59 RedisConnection::Cluster(conn) => conn.get_db(),
60 }
61 }
62}
63
64#[derive(Clone)]
65pub struct RedisConnectionManager {
66 pub client: Option<Client>,
67 pub cluster_client: Option<ClusterClient>,
68}
69
70#[async_trait::async_trait]
71impl bb8::ManageConnection for RedisConnectionManager {
72 type Connection = RedisConnection;
73 type Error = Error;
74
75 async fn connect(&self) -> Result<RedisConnection, Self::Error> {
76 if let Some(client) = self.client.clone() {
77 ConnectionManager::new(client.clone())
78 .await
79 .map_err(format_redis_error)
80 .map(RedisConnection::Normal)
81 } else {
82 self.cluster_client
83 .clone()
84 .unwrap()
85 .get_async_connection()
86 .await
87 .map_err(format_redis_error)
88 .map(RedisConnection::Cluster)
89 }
90 }
91
92 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
93 let pong: String = conn.ping().await.map_err(format_redis_error)?;
94
95 if pong == "PONG" {
96 Ok(())
97 } else {
98 Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
99 }
100 }
101
102 fn has_broken(&self, _: &mut Self::Connection) -> bool {
103 false
104 }
105}
106
107pub fn format_redis_error(e: RedisError) -> Error {
108 Error::new(ErrorKind::Unexpected, e.category())
109 .set_source(e)
110 .set_temporary()
111}