opendal/services/redis/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::time::Duration;
21
22use bb8::RunError;
23use bytes::Bytes;
24use redis::aio::ConnectionLike;
25use redis::aio::ConnectionManager;
26use redis::cluster::ClusterClient;
27use redis::cluster_async::ClusterConnection;
28use redis::AsyncCommands;
29use redis::Client;
30use redis::Cmd;
31use redis::Pipeline;
32use redis::RedisError;
33use redis::RedisFuture;
34use redis::Value;
35use tokio::sync::OnceCell;
36
37use crate::*;
38
39#[derive(Clone)]
40pub enum RedisConnection {
41    Normal(ConnectionManager),
42    Cluster(ClusterConnection),
43}
44
45impl ConnectionLike for RedisConnection {
46    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
47        match self {
48            RedisConnection::Normal(conn) => conn.req_packed_command(cmd),
49            RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
50        }
51    }
52
53    fn req_packed_commands<'a>(
54        &'a mut self,
55        cmd: &'a Pipeline,
56        offset: usize,
57        count: usize,
58    ) -> RedisFuture<'a, Vec<Value>> {
59        match self {
60            RedisConnection::Normal(conn) => conn.req_packed_commands(cmd, offset, count),
61            RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
62        }
63    }
64
65    fn get_db(&self) -> i64 {
66        match self {
67            RedisConnection::Normal(conn) => conn.get_db(),
68            RedisConnection::Cluster(conn) => conn.get_db(),
69        }
70    }
71}
72
73#[derive(Clone)]
74pub struct RedisConnectionManager {
75    pub client: Option<Client>,
76    pub cluster_client: Option<ClusterClient>,
77}
78
79impl bb8::ManageConnection for RedisConnectionManager {
80    type Connection = RedisConnection;
81    type Error = Error;
82
83    async fn connect(&self) -> Result<RedisConnection, Self::Error> {
84        if let Some(client) = self.client.clone() {
85            ConnectionManager::new(client.clone())
86                .await
87                .map_err(format_redis_error)
88                .map(RedisConnection::Normal)
89        } else {
90            self.cluster_client
91                .clone()
92                .unwrap()
93                .get_async_connection()
94                .await
95                .map_err(format_redis_error)
96                .map(RedisConnection::Cluster)
97        }
98    }
99
100    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
101        let pong: String = conn.ping().await.map_err(format_redis_error)?;
102
103        if pong == "PONG" {
104            Ok(())
105        } else {
106            Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
107        }
108    }
109
110    fn has_broken(&self, _: &mut Self::Connection) -> bool {
111        false
112    }
113}
114
115/// RedisCore holds the Redis connection and configuration
116#[derive(Clone)]
117pub struct RedisCore {
118    pub addr: String,
119    pub client: Option<Client>,
120    pub cluster_client: Option<ClusterClient>,
121    pub conn: OnceCell<bb8::Pool<RedisConnectionManager>>,
122    pub default_ttl: Option<Duration>,
123}
124
125impl Debug for RedisCore {
126    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127        let mut ds = f.debug_struct("RedisCore");
128        ds.field("addr", &self.addr);
129        ds.finish()
130    }
131}
132
133impl RedisCore {
134    pub async fn conn(&self) -> Result<bb8::PooledConnection<'_, RedisConnectionManager>> {
135        let pool = self
136            .conn
137            .get_or_try_init(|| async {
138                bb8::Pool::builder()
139                    .build(self.get_redis_connection_manager())
140                    .await
141                    .map_err(|err| {
142                        Error::new(ErrorKind::ConfigInvalid, "connect to redis failed")
143                            .set_source(err)
144                    })
145            })
146            .await?;
147        pool.get().await.map_err(|err| match err {
148            RunError::TimedOut => {
149                Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
150            }
151            RunError::User(err) => err,
152        })
153    }
154
155    pub fn get_redis_connection_manager(&self) -> RedisConnectionManager {
156        if let Some(_client) = self.client.clone() {
157            RedisConnectionManager {
158                client: self.client.clone(),
159                cluster_client: None,
160            }
161        } else {
162            RedisConnectionManager {
163                client: None,
164                cluster_client: self.cluster_client.clone(),
165            }
166        }
167    }
168
169    pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
170        let mut conn = self.conn().await?;
171        let result: Option<Bytes> = conn.get(key).await.map_err(format_redis_error)?;
172        Ok(result.map(Buffer::from))
173    }
174
175    pub async fn get_range(&self, key: &str, start: isize, end: isize) -> Result<Option<Buffer>> {
176        let mut conn = self.conn().await?;
177        let result: Option<Bytes> = conn
178            .getrange(key, start, end)
179            .await
180            .map_err(format_redis_error)?;
181        Ok(result.map(Buffer::from))
182    }
183
184    pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
185        let mut conn = self.conn().await?;
186        let value = value.to_vec();
187        if let Some(dur) = self.default_ttl {
188            let _: () = conn
189                .set_ex(key, value, dur.as_secs())
190                .await
191                .map_err(format_redis_error)?;
192        } else {
193            let _: () = conn.set(key, value).await.map_err(format_redis_error)?;
194        }
195        Ok(())
196    }
197
198    pub async fn delete(&self, key: &str) -> Result<()> {
199        let mut conn = self.conn().await?;
200        let _: () = conn.del(key).await.map_err(format_redis_error)?;
201        Ok(())
202    }
203}
204
205pub fn format_redis_error(e: RedisError) -> Error {
206    Error::new(ErrorKind::Unexpected, e.category())
207        .set_source(e)
208        .set_temporary()
209}