opendal/services/redis/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::Error;
19use crate::ErrorKind;
20
21use redis::aio::ConnectionLike;
22use redis::aio::ConnectionManager;
23use redis::cluster::ClusterClient;
24use redis::cluster_async::ClusterConnection;
25use redis::AsyncCommands;
26use redis::Client;
27use redis::RedisError;
28use redis::{Cmd, Pipeline, RedisFuture, Value};
29
30#[derive(Clone)]
31pub enum RedisConnection {
32    Normal(ConnectionManager),
33    Cluster(ClusterConnection),
34}
35
36impl ConnectionLike for RedisConnection {
37    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
38        match self {
39            RedisConnection::Normal(conn) => conn.req_packed_command(cmd),
40            RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
41        }
42    }
43
44    fn req_packed_commands<'a>(
45        &'a mut self,
46        cmd: &'a Pipeline,
47        offset: usize,
48        count: usize,
49    ) -> RedisFuture<'a, Vec<Value>> {
50        match self {
51            RedisConnection::Normal(conn) => conn.req_packed_commands(cmd, offset, count),
52            RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
53        }
54    }
55
56    fn get_db(&self) -> i64 {
57        match self {
58            RedisConnection::Normal(conn) => conn.get_db(),
59            RedisConnection::Cluster(conn) => conn.get_db(),
60        }
61    }
62}
63
64#[derive(Clone)]
65pub struct RedisConnectionManager {
66    pub client: Option<Client>,
67    pub cluster_client: Option<ClusterClient>,
68}
69
70#[async_trait::async_trait]
71impl bb8::ManageConnection for RedisConnectionManager {
72    type Connection = RedisConnection;
73    type Error = Error;
74
75    async fn connect(&self) -> Result<RedisConnection, Self::Error> {
76        if let Some(client) = self.client.clone() {
77            ConnectionManager::new(client.clone())
78                .await
79                .map_err(format_redis_error)
80                .map(RedisConnection::Normal)
81        } else {
82            self.cluster_client
83                .clone()
84                .unwrap()
85                .get_async_connection()
86                .await
87                .map_err(format_redis_error)
88                .map(RedisConnection::Cluster)
89        }
90    }
91
92    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
93        let pong: String = conn.ping().await.map_err(format_redis_error)?;
94
95        if pong == "PONG" {
96            Ok(())
97        } else {
98            Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
99        }
100    }
101
102    fn has_broken(&self, _: &mut Self::Connection) -> bool {
103        false
104    }
105}
106
107pub fn format_redis_error(e: RedisError) -> Error {
108    Error::new(ErrorKind::Unexpected, e.category())
109        .set_source(e)
110        .set_temporary()
111}