opendal/services/redis/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21
22use http::Uri;
23use redis::Client;
24use redis::ConnectionAddr;
25use redis::ConnectionInfo;
26use redis::ProtocolVersion;
27use redis::RedisConnectionInfo;
28use redis::cluster::ClusterClientBuilder;
29
30use super::REDIS_SCHEME;
31use super::config::RedisConfig;
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use crate::raw::*;
36use crate::*;
37
38const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
39const DEFAULT_REDIS_PORT: u16 = 6379;
40
41/// [Redis](https://redis.io/) services support.
42#[doc = include_str!("docs.md")]
43#[derive(Debug, Default)]
44pub struct RedisBuilder {
45    pub(super) config: RedisConfig,
46}
47
48impl RedisBuilder {
49    /// set the network address of redis service.
50    ///
51    /// currently supported schemes:
52    /// - no scheme: will be seen as "tcp"
53    /// - "tcp" or "redis": unsecured redis connections
54    /// - "rediss": secured redis connections
55    /// - "unix" or "redis+unix": unix socket connection
56    pub fn endpoint(mut self, endpoint: &str) -> Self {
57        if !endpoint.is_empty() {
58            self.config.endpoint = Some(endpoint.to_owned());
59        }
60        self
61    }
62
63    /// set the network address of redis cluster service.
64    /// This parameter is mutually exclusive with the endpoint parameter.
65    ///
66    /// currently supported schemes:
67    /// - no scheme: will be seen as "tcp"
68    /// - "tcp" or "redis": unsecured redis connections
69    /// - "rediss": secured redis connections
70    /// - "unix" or "redis+unix": unix socket connection
71    pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
72        if !cluster_endpoints.is_empty() {
73            self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
74        }
75        self
76    }
77
78    /// set the username for redis
79    ///
80    /// default: no username
81    pub fn username(mut self, username: &str) -> Self {
82        if !username.is_empty() {
83            self.config.username = Some(username.to_owned());
84        }
85        self
86    }
87
88    /// set the password for redis
89    ///
90    /// default: no password
91    pub fn password(mut self, password: &str) -> Self {
92        if !password.is_empty() {
93            self.config.password = Some(password.to_owned());
94        }
95        self
96    }
97
98    /// set the db used in redis
99    ///
100    /// default: 0
101    pub fn db(mut self, db: i64) -> Self {
102        self.config.db = db;
103        self
104    }
105
106    /// Set the default ttl for redis services.
107    ///
108    /// If set, we will specify `EX` for write operations.
109    pub fn default_ttl(mut self, ttl: Duration) -> Self {
110        self.config.default_ttl = Some(ttl);
111        self
112    }
113
114    /// set the working directory, all operations will be performed under it.
115    ///
116    /// default: "/"
117    pub fn root(mut self, root: &str) -> Self {
118        self.config.root = if root.is_empty() {
119            None
120        } else {
121            Some(root.to_string())
122        };
123
124        self
125    }
126
127    /// Sets the maximum number of connections managed by the pool.
128    ///
129    /// Defaults to 10.
130    ///
131    /// # Panics
132    ///
133    /// Will panic if `max_size` is 0.
134    #[must_use]
135    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
136        assert!(max_size > 0, "max_size must be greater than zero!");
137        self.config.connection_pool_max_size = Some(max_size);
138        self
139    }
140}
141
142impl Builder for RedisBuilder {
143    type Config = RedisConfig;
144
145    fn build(self) -> Result<impl Access> {
146        let root = normalize_root(
147            self.config
148                .root
149                .clone()
150                .unwrap_or_else(|| "/".to_string())
151                .as_str(),
152        );
153
154        if let Some(endpoints) = self.config.cluster_endpoints.clone() {
155            let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
156            for endpoint in endpoints.split(',') {
157                cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
158            }
159            let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
160            if let Some(username) = &self.config.username {
161                client_builder = client_builder.username(username.clone());
162            }
163            if let Some(password) = &self.config.password {
164                client_builder = client_builder.password(password.clone());
165            }
166            let client = client_builder.build().map_err(format_redis_error)?;
167
168            Ok(RedisBackend::new(RedisCore::new(
169                endpoints,
170                None,
171                Some(client),
172                self.config.default_ttl,
173                self.config.connection_pool_max_size,
174            ))
175            .with_normalized_root(root))
176        } else {
177            let endpoint = self
178                .config
179                .endpoint
180                .clone()
181                .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
182
183            let client =
184                Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
185                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
186                        .with_context("service", REDIS_SCHEME)
187                        .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
188                        .with_context("db", self.config.db.to_string())
189                        .set_source(e)
190                })?;
191
192            Ok(RedisBackend::new(RedisCore::new(
193                endpoint,
194                Some(client),
195                None,
196                self.config.default_ttl,
197                self.config.connection_pool_max_size,
198            ))
199            .with_normalized_root(root))
200        }
201    }
202}
203
204impl RedisBuilder {
205    fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
206        let ep_url = endpoint.parse::<Uri>().map_err(|e| {
207            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
208                .with_context("service", REDIS_SCHEME)
209                .with_context("endpoint", endpoint)
210                .set_source(e)
211        })?;
212
213        let con_addr = match ep_url.scheme_str() {
214            Some("tcp") | Some("redis") | None => {
215                let host = ep_url
216                    .host()
217                    .map(|h| h.to_string())
218                    .unwrap_or_else(|| "127.0.0.1".to_string());
219                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
220                ConnectionAddr::Tcp(host, port)
221            }
222            Some("rediss") => {
223                let host = ep_url
224                    .host()
225                    .map(|h| h.to_string())
226                    .unwrap_or_else(|| "127.0.0.1".to_string());
227                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228                ConnectionAddr::TcpTls {
229                    host,
230                    port,
231                    insecure: false,
232                    tls_params: None,
233                }
234            }
235            Some("unix") | Some("redis+unix") => {
236                let path = PathBuf::from(ep_url.path());
237                ConnectionAddr::Unix(path)
238            }
239            Some(s) => {
240                return Err(
241                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
242                        .with_context("service", REDIS_SCHEME)
243                        .with_context("scheme", s),
244                );
245            }
246        };
247
248        let redis_info = RedisConnectionInfo {
249            db: self.config.db,
250            username: self.config.username.clone(),
251            password: self.config.password.clone(),
252            protocol: ProtocolVersion::RESP2,
253        };
254
255        Ok(ConnectionInfo {
256            addr: con_addr,
257            redis: redis_info,
258        })
259    }
260}
261
262/// RedisBackend implements Access trait directly
263#[derive(Debug, Clone)]
264pub struct RedisBackend {
265    core: Arc<RedisCore>,
266    root: String,
267    info: Arc<AccessorInfo>,
268}
269
270impl RedisBackend {
271    fn new(core: RedisCore) -> Self {
272        let info = AccessorInfo::default();
273        info.set_scheme(REDIS_SCHEME);
274        info.set_name(core.addr());
275        info.set_root("/");
276        info.set_native_capability(Capability {
277            read: true,
278            write: true,
279            delete: true,
280            stat: true,
281            write_can_empty: true,
282            shared: true,
283            ..Default::default()
284        });
285
286        Self {
287            core: Arc::new(core),
288            root: "/".to_string(),
289            info: Arc::new(info),
290        }
291    }
292
293    fn with_normalized_root(mut self, root: String) -> Self {
294        self.info.set_root(&root);
295        self.root = root;
296        self
297    }
298}
299
300impl Access for RedisBackend {
301    type Reader = Buffer;
302    type Writer = RedisWriter;
303    type Lister = ();
304    type Deleter = oio::OneShotDeleter<RedisDeleter>;
305
306    fn info(&self) -> Arc<AccessorInfo> {
307        self.info.clone()
308    }
309
310    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
311        let p = build_abs_path(&self.root, path);
312
313        if p == build_abs_path(&self.root, "") {
314            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
315        } else {
316            let bs = self.core.get(&p).await?;
317            match bs {
318                Some(bs) => Ok(RpStat::new(
319                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
320                )),
321                None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
322            }
323        }
324    }
325
326    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
327        let p = build_abs_path(&self.root, path);
328
329        let range = args.range();
330        let buffer = if range.is_full() {
331            // Full read - use GET
332            match self.core.get(&p).await? {
333                Some(bs) => bs,
334                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
335            }
336        } else {
337            // Range read - use GETRANGE
338            let start = range.offset() as isize;
339            let end = match range.size() {
340                Some(size) => (range.offset() + size - 1) as isize,
341                None => -1, // Redis uses -1 for end of string
342            };
343
344            match self.core.get_range(&p, start, end).await? {
345                Some(bs) => bs,
346                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
347            }
348        };
349
350        Ok((RpRead::new(), buffer))
351    }
352
353    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
354        let p = build_abs_path(&self.root, path);
355        Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
356    }
357
358    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
359        Ok((
360            RpDelete::default(),
361            oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
362        ))
363    }
364}