opendal_core/services/redis/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use fastpool::{ManageObject, ObjectStatus, bounded};
23use redis::AsyncCommands;
24use redis::Client;
25use redis::Cmd;
26use redis::Pipeline;
27use redis::RedisError;
28use redis::RedisFuture;
29use redis::Value;
30use redis::aio::ConnectionLike;
31use redis::aio::ConnectionManager;
32use redis::cluster::ClusterClient;
33use redis::cluster_async::ClusterConnection;
34
35use crate::raw::*;
36use crate::*;
37
38#[derive(Clone)]
39pub enum RedisConnection {
40    Normal(ConnectionManager),
41    Cluster(ClusterConnection),
42}
43
44impl ConnectionLike for RedisConnection {
45    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
46        match self {
47            RedisConnection::Normal(conn) => conn.req_packed_command(cmd),
48            RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
49        }
50    }
51
52    fn req_packed_commands<'a>(
53        &'a mut self,
54        cmd: &'a Pipeline,
55        offset: usize,
56        count: usize,
57    ) -> RedisFuture<'a, Vec<Value>> {
58        match self {
59            RedisConnection::Normal(conn) => conn.req_packed_commands(cmd, offset, count),
60            RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
61        }
62    }
63
64    fn get_db(&self) -> i64 {
65        match self {
66            RedisConnection::Normal(conn) => conn.get_db(),
67            RedisConnection::Cluster(conn) => conn.get_db(),
68        }
69    }
70}
71
72#[derive(Clone)]
73pub struct RedisConnectionManager {
74    client: Option<Client>,
75    cluster_client: Option<ClusterClient>,
76}
77
78impl ManageObject for RedisConnectionManager {
79    type Object = RedisConnection;
80    type Error = Error;
81
82    async fn create(&self) -> Result<RedisConnection, Self::Error> {
83        if let Some(client) = self.client.clone() {
84            ConnectionManager::new(client.clone())
85                .await
86                .map_err(format_redis_error)
87                .map(RedisConnection::Normal)
88        } else {
89            self.cluster_client
90                .clone()
91                .unwrap()
92                .get_async_connection()
93                .await
94                .map_err(format_redis_error)
95                .map(RedisConnection::Cluster)
96        }
97    }
98
99    async fn is_recyclable(
100        &self,
101        o: &mut Self::Object,
102        _: &ObjectStatus,
103    ) -> Result<(), Self::Error> {
104        match o.ping::<String>().await {
105            Ok(ref pong) => match pong.as_bytes() {
106                b"PONG" => Ok(()),
107                _ => Err(Error::new(ErrorKind::Unexpected, "PING ERROR")),
108            },
109            Err(err) => Err(format_redis_error(err)),
110        }
111    }
112}
113
114/// RedisCore holds the Redis connection and configuration
115#[derive(Clone)]
116pub struct RedisCore {
117    addr: String,
118    conn: Arc<bounded::Pool<RedisConnectionManager>>,
119    default_ttl: Option<Duration>,
120}
121
122impl Debug for RedisCore {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_struct("RedisCore")
125            .field("addr", &self.addr)
126            .finish_non_exhaustive()
127    }
128}
129
130impl RedisCore {
131    pub fn new(
132        endpoint: String,
133        client: Option<Client>,
134        cluster_client: Option<ClusterClient>,
135        default_ttl: Option<Duration>,
136        connection_pool_max_size: Option<usize>,
137    ) -> Self {
138        let manager = RedisConnectionManager {
139            client,
140            cluster_client,
141        };
142        let pool = bounded::Pool::new(
143            bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
144            manager,
145        );
146
147        Self {
148            addr: endpoint,
149            conn: pool,
150            default_ttl,
151        }
152    }
153
154    pub fn addr(&self) -> &str {
155        &self.addr
156    }
157
158    pub async fn conn(&self) -> Result<bounded::Object<RedisConnectionManager>> {
159        let fut = self.conn.get();
160
161        tokio::select! {
162            _ = tokio::time::sleep(Duration::from_secs(10)) => {
163                Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
164            }
165            result = fut => match result {
166                Ok(conn) => Ok(conn),
167                Err(err) => Err(err),
168            }
169        }
170    }
171
172    pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
173        let mut conn = self.conn().await?;
174        let result: Option<Bytes> = conn.get(key).await.map_err(format_redis_error)?;
175        Ok(result.map(Buffer::from))
176    }
177
178    pub async fn get_range(&self, key: &str, start: isize, end: isize) -> Result<Option<Buffer>> {
179        let mut conn = self.conn().await?;
180        let result: Option<Bytes> = conn
181            .getrange(key, start, end)
182            .await
183            .map_err(format_redis_error)?;
184        Ok(result.map(Buffer::from))
185    }
186
187    pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
188        let mut conn = self.conn().await?;
189        let value = value.to_vec();
190        if let Some(dur) = self.default_ttl {
191            let _: () = conn
192                .set_ex(key, value, dur.as_secs())
193                .await
194                .map_err(format_redis_error)?;
195        } else {
196            let _: () = conn.set(key, value).await.map_err(format_redis_error)?;
197        }
198        Ok(())
199    }
200
201    pub async fn delete(&self, key: &str) -> Result<()> {
202        let mut conn = self.conn().await?;
203        let _: () = conn.del(key).await.map_err(format_redis_error)?;
204        Ok(())
205    }
206}
207
208pub fn format_redis_error(e: RedisError) -> Error {
209    Error::new(ErrorKind::Unexpected, e.category())
210        .set_source(e)
211        .set_temporary()
212}