opendal/services/redis/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::PathBuf;
21use std::time::Duration;
22
23use http::Uri;
24use redis::cluster::ClusterClientBuilder;
25use redis::Client;
26use redis::ConnectionAddr;
27use redis::ConnectionInfo;
28use redis::ProtocolVersion;
29use redis::RedisConnectionInfo;
30use tokio::sync::OnceCell;
31
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use crate::raw::oio;
36use crate::raw::*;
37use crate::services::RedisConfig;
38use crate::*;
39
40const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
41const DEFAULT_REDIS_PORT: u16 = 6379;
42
43impl Configurator for RedisConfig {
44    type Builder = RedisBuilder;
45    fn into_builder(self) -> Self::Builder {
46        RedisBuilder { config: self }
47    }
48}
49
50/// [Redis](https://redis.io/) services support.
51#[doc = include_str!("docs.md")]
52#[derive(Clone, Default)]
53pub struct RedisBuilder {
54    config: RedisConfig,
55}
56
57impl Debug for RedisBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        let mut d = f.debug_struct("RedisBuilder");
60
61        d.field("config", &self.config);
62        d.finish_non_exhaustive()
63    }
64}
65
66impl RedisBuilder {
67    /// set the network address of redis service.
68    ///
69    /// currently supported schemes:
70    /// - no scheme: will be seen as "tcp"
71    /// - "tcp" or "redis": unsecured redis connections
72    /// - "rediss": secured redis connections
73    /// - "unix" or "redis+unix": unix socket connection
74    pub fn endpoint(mut self, endpoint: &str) -> Self {
75        if !endpoint.is_empty() {
76            self.config.endpoint = Some(endpoint.to_owned());
77        }
78        self
79    }
80
81    /// set the network address of redis cluster service.
82    /// This parameter is mutually exclusive with the endpoint parameter.
83    ///
84    /// currently supported schemes:
85    /// - no scheme: will be seen as "tcp"
86    /// - "tcp" or "redis": unsecured redis connections
87    /// - "rediss": secured redis connections
88    /// - "unix" or "redis+unix": unix socket connection
89    pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
90        if !cluster_endpoints.is_empty() {
91            self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
92        }
93        self
94    }
95
96    /// set the username for redis
97    ///
98    /// default: no username
99    pub fn username(mut self, username: &str) -> Self {
100        if !username.is_empty() {
101            self.config.username = Some(username.to_owned());
102        }
103        self
104    }
105
106    /// set the password for redis
107    ///
108    /// default: no password
109    pub fn password(mut self, password: &str) -> Self {
110        if !password.is_empty() {
111            self.config.password = Some(password.to_owned());
112        }
113        self
114    }
115
116    /// set the db used in redis
117    ///
118    /// default: 0
119    pub fn db(mut self, db: i64) -> Self {
120        self.config.db = db;
121        self
122    }
123
124    /// Set the default ttl for redis services.
125    ///
126    /// If set, we will specify `EX` for write operations.
127    pub fn default_ttl(mut self, ttl: Duration) -> Self {
128        self.config.default_ttl = Some(ttl);
129        self
130    }
131
132    /// set the working directory, all operations will be performed under it.
133    ///
134    /// default: "/"
135    pub fn root(mut self, root: &str) -> Self {
136        self.config.root = if root.is_empty() {
137            None
138        } else {
139            Some(root.to_string())
140        };
141
142        self
143    }
144}
145
146impl Builder for RedisBuilder {
147    const SCHEME: Scheme = Scheme::Redis;
148    type Config = RedisConfig;
149
150    fn build(self) -> Result<impl Access> {
151        let root = normalize_root(
152            self.config
153                .root
154                .clone()
155                .unwrap_or_else(|| "/".to_string())
156                .as_str(),
157        );
158
159        if let Some(endpoints) = self.config.cluster_endpoints.clone() {
160            let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
161            for endpoint in endpoints.split(',') {
162                cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
163            }
164            let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
165            if let Some(username) = &self.config.username {
166                client_builder = client_builder.username(username.clone());
167            }
168            if let Some(password) = &self.config.password {
169                client_builder = client_builder.password(password.clone());
170            }
171            let client = client_builder.build().map_err(format_redis_error)?;
172
173            let conn = OnceCell::new();
174
175            Ok(RedisAccessor::new(RedisCore {
176                addr: endpoints,
177                client: None,
178                cluster_client: Some(client),
179                conn,
180                default_ttl: self.config.default_ttl,
181            })
182            .with_normalized_root(root))
183        } else {
184            let endpoint = self
185                .config
186                .endpoint
187                .clone()
188                .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
189
190            let client =
191                Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
192                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
193                        .with_context("service", Scheme::Redis)
194                        .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
195                        .with_context("db", self.config.db.to_string())
196                        .set_source(e)
197                })?;
198
199            let conn = OnceCell::new();
200            Ok(RedisAccessor::new(RedisCore {
201                addr: endpoint,
202                client: Some(client),
203                cluster_client: None,
204                conn,
205                default_ttl: self.config.default_ttl,
206            })
207            .with_normalized_root(root))
208        }
209    }
210}
211
212impl RedisBuilder {
213    fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
214        let ep_url = endpoint.parse::<Uri>().map_err(|e| {
215            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
216                .with_context("service", Scheme::Redis)
217                .with_context("endpoint", endpoint)
218                .set_source(e)
219        })?;
220
221        let con_addr = match ep_url.scheme_str() {
222            Some("tcp") | Some("redis") | None => {
223                let host = ep_url
224                    .host()
225                    .map(|h| h.to_string())
226                    .unwrap_or_else(|| "127.0.0.1".to_string());
227                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228                ConnectionAddr::Tcp(host, port)
229            }
230            Some("rediss") => {
231                let host = ep_url
232                    .host()
233                    .map(|h| h.to_string())
234                    .unwrap_or_else(|| "127.0.0.1".to_string());
235                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
236                ConnectionAddr::TcpTls {
237                    host,
238                    port,
239                    insecure: false,
240                    tls_params: None,
241                }
242            }
243            Some("unix") | Some("redis+unix") => {
244                let path = PathBuf::from(ep_url.path());
245                ConnectionAddr::Unix(path)
246            }
247            Some(s) => {
248                return Err(
249                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
250                        .with_context("service", Scheme::Redis)
251                        .with_context("scheme", s),
252                )
253            }
254        };
255
256        let redis_info = RedisConnectionInfo {
257            db: self.config.db,
258            username: self.config.username.clone(),
259            password: self.config.password.clone(),
260            protocol: ProtocolVersion::RESP2,
261        };
262
263        Ok(ConnectionInfo {
264            addr: con_addr,
265            redis: redis_info,
266        })
267    }
268}
269
270/// RedisAccessor implements Access trait directly
271#[derive(Debug, Clone)]
272pub struct RedisAccessor {
273    core: std::sync::Arc<RedisCore>,
274    root: String,
275    info: std::sync::Arc<AccessorInfo>,
276}
277
278impl RedisAccessor {
279    fn new(core: RedisCore) -> Self {
280        let info = AccessorInfo::default();
281        info.set_scheme(Scheme::Redis);
282        info.set_name(&core.addr);
283        info.set_root("/");
284        info.set_native_capability(Capability {
285            read: true,
286            write: true,
287            delete: true,
288            stat: true,
289            write_can_empty: true,
290            shared: true,
291            ..Default::default()
292        });
293
294        Self {
295            core: std::sync::Arc::new(core),
296            root: "/".to_string(),
297            info: std::sync::Arc::new(info),
298        }
299    }
300
301    fn with_normalized_root(mut self, root: String) -> Self {
302        self.info.set_root(&root);
303        self.root = root;
304        self
305    }
306}
307
308impl Access for RedisAccessor {
309    type Reader = Buffer;
310    type Writer = RedisWriter;
311    type Lister = ();
312    type Deleter = oio::OneShotDeleter<RedisDeleter>;
313
314    fn info(&self) -> std::sync::Arc<AccessorInfo> {
315        self.info.clone()
316    }
317
318    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
319        let p = build_abs_path(&self.root, path);
320
321        if p == build_abs_path(&self.root, "") {
322            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
323        } else {
324            let bs = self.core.get(&p).await?;
325            match bs {
326                Some(bs) => Ok(RpStat::new(
327                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
328                )),
329                None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
330            }
331        }
332    }
333
334    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
335        let p = build_abs_path(&self.root, path);
336
337        let range = args.range();
338        let buffer = if range.is_full() {
339            // Full read - use GET
340            match self.core.get(&p).await? {
341                Some(bs) => bs,
342                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
343            }
344        } else {
345            // Range read - use GETRANGE
346            let start = range.offset() as isize;
347            let end = match range.size() {
348                Some(size) => (range.offset() + size - 1) as isize,
349                None => -1, // Redis uses -1 for end of string
350            };
351
352            match self.core.get_range(&p, start, end).await? {
353                Some(bs) => bs,
354                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
355            }
356        };
357
358        Ok((RpRead::new(), buffer))
359    }
360
361    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
362        let p = build_abs_path(&self.root, path);
363        Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
364    }
365
366    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
367        Ok((
368            RpDelete::default(),
369            oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
370        ))
371    }
372
373    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
374        let _ = build_abs_path(&self.root, path);
375        // Redis doesn't support listing keys, return empty list
376        Ok((RpList::default(), ()))
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383    use std::time::Duration;
384
385    #[test]
386    fn test_redis_accessor_creation() {
387        let core = RedisCore {
388            addr: "redis://127.0.0.1:6379".to_string(),
389            client: None,
390            cluster_client: None,
391            conn: OnceCell::new(),
392            default_ttl: Some(Duration::from_secs(60)),
393        };
394
395        let accessor = RedisAccessor::new(core);
396
397        // Verify basic properties
398        assert_eq!(accessor.root, "/");
399        assert_eq!(accessor.info.scheme(), Scheme::Redis);
400        assert!(accessor.info.native_capability().read);
401        assert!(accessor.info.native_capability().write);
402        assert!(accessor.info.native_capability().delete);
403        assert!(accessor.info.native_capability().stat);
404    }
405
406    #[test]
407    fn test_redis_accessor_with_root() {
408        let core = RedisCore {
409            addr: "redis://127.0.0.1:6379".to_string(),
410            client: None,
411            cluster_client: None,
412            conn: OnceCell::new(),
413            default_ttl: None,
414        };
415
416        let accessor = RedisAccessor::new(core).with_normalized_root("/test/".to_string());
417
418        assert_eq!(accessor.root, "/test/");
419        assert_eq!(accessor.info.root(), "/test/".into());
420    }
421
422    #[test]
423    fn test_redis_builder_interface() {
424        // Test that RedisBuilder still works with the new implementation
425        let builder = RedisBuilder::default()
426            .endpoint("redis://localhost:6379")
427            .username("testuser")
428            .password("testpass")
429            .db(1)
430            .root("/test");
431
432        // The builder should be able to create configuration
433        assert!(builder.config.endpoint.is_some());
434        assert_eq!(
435            builder.config.endpoint.as_ref().unwrap(),
436            "redis://localhost:6379"
437        );
438        assert_eq!(builder.config.username.as_ref().unwrap(), "testuser");
439        assert_eq!(builder.config.password.as_ref().unwrap(), "testpass");
440        assert_eq!(builder.config.db, 1);
441        assert_eq!(builder.config.root.as_ref().unwrap(), "/test");
442    }
443}