opendal_core/services/redis/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use http::Uri;
22use redis::Client;
23use redis::ConnectionAddr;
24use redis::ConnectionInfo;
25use redis::ProtocolVersion;
26use redis::RedisConnectionInfo;
27use redis::cluster::ClusterClientBuilder;
28
29use super::REDIS_SCHEME;
30use super::config::RedisConfig;
31use super::core::*;
32use super::delete::RedisDeleter;
33use super::writer::RedisWriter;
34use crate::raw::*;
35use crate::*;
36
37const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
38const DEFAULT_REDIS_PORT: u16 = 6379;
39
40/// [Redis](https://redis.io/) services support.
41#[doc = include_str!("docs.md")]
42#[derive(Debug, Default)]
43pub struct RedisBuilder {
44    pub(super) config: RedisConfig,
45}
46
47impl RedisBuilder {
48    /// set the network address of redis service.
49    ///
50    /// currently supported schemes:
51    /// - no scheme: will be seen as "tcp"
52    /// - "tcp" or "redis": unsecured redis connections
53    /// - "rediss": secured redis connections
54    /// - "unix" or "redis+unix": unix socket connection
55    pub fn endpoint(mut self, endpoint: &str) -> Self {
56        if !endpoint.is_empty() {
57            self.config.endpoint = Some(endpoint.to_owned());
58        }
59        self
60    }
61
62    /// set the network address of redis cluster service.
63    /// This parameter is mutually exclusive with the endpoint parameter.
64    ///
65    /// currently supported schemes:
66    /// - no scheme: will be seen as "tcp"
67    /// - "tcp" or "redis": unsecured redis connections
68    /// - "rediss": secured redis connections
69    /// - "unix" or "redis+unix": unix socket connection
70    pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
71        if !cluster_endpoints.is_empty() {
72            self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
73        }
74        self
75    }
76
77    /// set the username for redis
78    ///
79    /// default: no username
80    pub fn username(mut self, username: &str) -> Self {
81        if !username.is_empty() {
82            self.config.username = Some(username.to_owned());
83        }
84        self
85    }
86
87    /// set the password for redis
88    ///
89    /// default: no password
90    pub fn password(mut self, password: &str) -> Self {
91        if !password.is_empty() {
92            self.config.password = Some(password.to_owned());
93        }
94        self
95    }
96
97    /// set the db used in redis
98    ///
99    /// default: 0
100    pub fn db(mut self, db: i64) -> Self {
101        self.config.db = db;
102        self
103    }
104
105    /// Set the default ttl for redis services.
106    ///
107    /// If set, we will specify `EX` for write operations.
108    pub fn default_ttl(mut self, ttl: Duration) -> Self {
109        self.config.default_ttl = Some(ttl);
110        self
111    }
112
113    /// set the working directory, all operations will be performed under it.
114    ///
115    /// default: "/"
116    pub fn root(mut self, root: &str) -> Self {
117        self.config.root = if root.is_empty() {
118            None
119        } else {
120            Some(root.to_string())
121        };
122
123        self
124    }
125
126    /// Sets the maximum number of connections managed by the pool.
127    ///
128    /// Defaults to 10.
129    ///
130    /// # Panics
131    ///
132    /// Will panic if `max_size` is 0.
133    #[must_use]
134    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
135        assert!(max_size > 0, "max_size must be greater than zero!");
136        self.config.connection_pool_max_size = Some(max_size);
137        self
138    }
139}
140
141impl Builder for RedisBuilder {
142    type Config = RedisConfig;
143
144    fn build(self) -> Result<impl Access> {
145        let root = normalize_root(
146            self.config
147                .root
148                .clone()
149                .unwrap_or_else(|| "/".to_string())
150                .as_str(),
151        );
152
153        if let Some(endpoints) = self.config.cluster_endpoints.clone() {
154            let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
155            for endpoint in endpoints.split(',') {
156                cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
157            }
158            let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
159            if let Some(username) = &self.config.username {
160                client_builder = client_builder.username(username.clone());
161            }
162            if let Some(password) = &self.config.password {
163                client_builder = client_builder.password(password.clone());
164            }
165            let client = client_builder.build().map_err(format_redis_error)?;
166
167            Ok(RedisBackend::new(RedisCore::new(
168                endpoints,
169                None,
170                Some(client),
171                self.config.default_ttl,
172                self.config.connection_pool_max_size,
173            ))
174            .with_normalized_root(root))
175        } else {
176            let endpoint = self
177                .config
178                .endpoint
179                .clone()
180                .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
181
182            let client =
183                Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
184                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
185                        .with_context("service", REDIS_SCHEME)
186                        .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
187                        .with_context("db", self.config.db.to_string())
188                        .set_source(e)
189                })?;
190
191            Ok(RedisBackend::new(RedisCore::new(
192                endpoint,
193                Some(client),
194                None,
195                self.config.default_ttl,
196                self.config.connection_pool_max_size,
197            ))
198            .with_normalized_root(root))
199        }
200    }
201}
202
203impl RedisBuilder {
204    fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
205        let ep_url = endpoint.parse::<Uri>().map_err(|e| {
206            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
207                .with_context("service", REDIS_SCHEME)
208                .with_context("endpoint", endpoint)
209                .set_source(e)
210        })?;
211
212        let con_addr = match ep_url.scheme_str() {
213            Some("tcp") | Some("redis") | None => {
214                let host = ep_url
215                    .host()
216                    .map(|h| h.to_string())
217                    .unwrap_or_else(|| "127.0.0.1".to_string());
218                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
219                ConnectionAddr::Tcp(host, port)
220            }
221            Some("rediss") => {
222                let host = ep_url
223                    .host()
224                    .map(|h| h.to_string())
225                    .unwrap_or_else(|| "127.0.0.1".to_string());
226                let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
227                ConnectionAddr::TcpTls {
228                    host,
229                    port,
230                    insecure: false,
231                    tls_params: None,
232                }
233            }
234            Some("unix") | Some("redis+unix") => {
235                let path = PathBuf::from(ep_url.path());
236                ConnectionAddr::Unix(path)
237            }
238            Some(s) => {
239                return Err(
240                    Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
241                        .with_context("service", REDIS_SCHEME)
242                        .with_context("scheme", s),
243                );
244            }
245        };
246
247        let redis_info = RedisConnectionInfo {
248            db: self.config.db,
249            username: self.config.username.clone(),
250            password: self.config.password.clone(),
251            protocol: ProtocolVersion::RESP2,
252        };
253
254        Ok(ConnectionInfo {
255            addr: con_addr,
256            redis: redis_info,
257        })
258    }
259}
260
261/// RedisBackend implements Access trait directly
262#[derive(Debug, Clone)]
263pub struct RedisBackend {
264    core: Arc<RedisCore>,
265    root: String,
266    info: Arc<AccessorInfo>,
267}
268
269impl RedisBackend {
270    fn new(core: RedisCore) -> Self {
271        let info = AccessorInfo::default();
272        info.set_scheme(REDIS_SCHEME);
273        info.set_name(core.addr());
274        info.set_root("/");
275        info.set_native_capability(Capability {
276            read: true,
277            write: true,
278            delete: true,
279            stat: true,
280            write_can_empty: true,
281            shared: true,
282            ..Default::default()
283        });
284
285        Self {
286            core: Arc::new(core),
287            root: "/".to_string(),
288            info: Arc::new(info),
289        }
290    }
291
292    fn with_normalized_root(mut self, root: String) -> Self {
293        self.info.set_root(&root);
294        self.root = root;
295        self
296    }
297}
298
299impl Access for RedisBackend {
300    type Reader = Buffer;
301    type Writer = RedisWriter;
302    type Lister = ();
303    type Deleter = oio::OneShotDeleter<RedisDeleter>;
304
305    fn info(&self) -> Arc<AccessorInfo> {
306        self.info.clone()
307    }
308
309    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
310        let p = build_abs_path(&self.root, path);
311
312        if p == build_abs_path(&self.root, "") {
313            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
314        } else {
315            let bs = self.core.get(&p).await?;
316            match bs {
317                Some(bs) => Ok(RpStat::new(
318                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
319                )),
320                None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
321            }
322        }
323    }
324
325    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
326        let p = build_abs_path(&self.root, path);
327
328        let range = args.range();
329        let buffer = if range.is_full() {
330            // Full read - use GET
331            match self.core.get(&p).await? {
332                Some(bs) => bs,
333                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
334            }
335        } else {
336            // Range read - use GETRANGE
337            let start = range.offset() as isize;
338            let end = match range.size() {
339                Some(size) => (range.offset() + size - 1) as isize,
340                None => -1, // Redis uses -1 for end of string
341            };
342
343            match self.core.get_range(&p, start, end).await? {
344                Some(bs) => bs,
345                None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
346            }
347        };
348
349        Ok((RpRead::new(), buffer))
350    }
351
352    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
353        let p = build_abs_path(&self.root, path);
354        Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
355    }
356
357    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
358        Ok((
359            RpDelete::default(),
360            oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
361        ))
362    }
363}