opendal_core/services/redis/
backend.rs1use std::path::PathBuf;
19use std::sync::Arc;
20
21use http::Uri;
22use redis::Client;
23use redis::ConnectionAddr;
24use redis::ConnectionInfo;
25use redis::ProtocolVersion;
26use redis::RedisConnectionInfo;
27use redis::cluster::ClusterClientBuilder;
28
29use super::REDIS_SCHEME;
30use super::config::RedisConfig;
31use super::core::*;
32use super::delete::RedisDeleter;
33use super::writer::RedisWriter;
34use crate::raw::*;
35use crate::*;
36
37const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
38const DEFAULT_REDIS_PORT: u16 = 6379;
39
40#[doc = include_str!("docs.md")]
42#[derive(Debug, Default)]
43pub struct RedisBuilder {
44 pub(super) config: RedisConfig,
45}
46
47impl RedisBuilder {
48 pub fn endpoint(mut self, endpoint: &str) -> Self {
56 if !endpoint.is_empty() {
57 self.config.endpoint = Some(endpoint.to_owned());
58 }
59 self
60 }
61
62 pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
71 if !cluster_endpoints.is_empty() {
72 self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
73 }
74 self
75 }
76
77 pub fn username(mut self, username: &str) -> Self {
81 if !username.is_empty() {
82 self.config.username = Some(username.to_owned());
83 }
84 self
85 }
86
87 pub fn password(mut self, password: &str) -> Self {
91 if !password.is_empty() {
92 self.config.password = Some(password.to_owned());
93 }
94 self
95 }
96
97 pub fn db(mut self, db: i64) -> Self {
101 self.config.db = db;
102 self
103 }
104
105 pub fn default_ttl(mut self, ttl: Duration) -> Self {
109 self.config.default_ttl = Some(ttl);
110 self
111 }
112
113 pub fn root(mut self, root: &str) -> Self {
117 self.config.root = if root.is_empty() {
118 None
119 } else {
120 Some(root.to_string())
121 };
122
123 self
124 }
125
126 #[must_use]
134 pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
135 assert!(max_size > 0, "max_size must be greater than zero!");
136 self.config.connection_pool_max_size = Some(max_size);
137 self
138 }
139}
140
141impl Builder for RedisBuilder {
142 type Config = RedisConfig;
143
144 fn build(self) -> Result<impl Access> {
145 let root = normalize_root(
146 self.config
147 .root
148 .clone()
149 .unwrap_or_else(|| "/".to_string())
150 .as_str(),
151 );
152
153 if let Some(endpoints) = self.config.cluster_endpoints.clone() {
154 let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
155 for endpoint in endpoints.split(',') {
156 cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
157 }
158 let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
159 if let Some(username) = &self.config.username {
160 client_builder = client_builder.username(username.clone());
161 }
162 if let Some(password) = &self.config.password {
163 client_builder = client_builder.password(password.clone());
164 }
165 let client = client_builder.build().map_err(format_redis_error)?;
166
167 Ok(RedisBackend::new(RedisCore::new(
168 endpoints,
169 None,
170 Some(client),
171 self.config.default_ttl,
172 self.config.connection_pool_max_size,
173 ))
174 .with_normalized_root(root))
175 } else {
176 let endpoint = self
177 .config
178 .endpoint
179 .clone()
180 .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
181
182 let client =
183 Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
184 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
185 .with_context("service", REDIS_SCHEME)
186 .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
187 .with_context("db", self.config.db.to_string())
188 .set_source(e)
189 })?;
190
191 Ok(RedisBackend::new(RedisCore::new(
192 endpoint,
193 Some(client),
194 None,
195 self.config.default_ttl,
196 self.config.connection_pool_max_size,
197 ))
198 .with_normalized_root(root))
199 }
200 }
201}
202
203impl RedisBuilder {
204 fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
205 let ep_url = endpoint.parse::<Uri>().map_err(|e| {
206 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
207 .with_context("service", REDIS_SCHEME)
208 .with_context("endpoint", endpoint)
209 .set_source(e)
210 })?;
211
212 let con_addr = match ep_url.scheme_str() {
213 Some("tcp") | Some("redis") | None => {
214 let host = ep_url
215 .host()
216 .map(|h| h.to_string())
217 .unwrap_or_else(|| "127.0.0.1".to_string());
218 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
219 ConnectionAddr::Tcp(host, port)
220 }
221 Some("rediss") => {
222 let host = ep_url
223 .host()
224 .map(|h| h.to_string())
225 .unwrap_or_else(|| "127.0.0.1".to_string());
226 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
227 ConnectionAddr::TcpTls {
228 host,
229 port,
230 insecure: false,
231 tls_params: None,
232 }
233 }
234 Some("unix") | Some("redis+unix") => {
235 let path = PathBuf::from(ep_url.path());
236 ConnectionAddr::Unix(path)
237 }
238 Some(s) => {
239 return Err(
240 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
241 .with_context("service", REDIS_SCHEME)
242 .with_context("scheme", s),
243 );
244 }
245 };
246
247 let redis_info = RedisConnectionInfo {
248 db: self.config.db,
249 username: self.config.username.clone(),
250 password: self.config.password.clone(),
251 protocol: ProtocolVersion::RESP2,
252 };
253
254 Ok(ConnectionInfo {
255 addr: con_addr,
256 redis: redis_info,
257 })
258 }
259}
260
261#[derive(Debug, Clone)]
263pub struct RedisBackend {
264 core: Arc<RedisCore>,
265 root: String,
266 info: Arc<AccessorInfo>,
267}
268
269impl RedisBackend {
270 fn new(core: RedisCore) -> Self {
271 let info = AccessorInfo::default();
272 info.set_scheme(REDIS_SCHEME);
273 info.set_name(core.addr());
274 info.set_root("/");
275 info.set_native_capability(Capability {
276 read: true,
277 write: true,
278 delete: true,
279 stat: true,
280 write_can_empty: true,
281 shared: true,
282 ..Default::default()
283 });
284
285 Self {
286 core: Arc::new(core),
287 root: "/".to_string(),
288 info: Arc::new(info),
289 }
290 }
291
292 fn with_normalized_root(mut self, root: String) -> Self {
293 self.info.set_root(&root);
294 self.root = root;
295 self
296 }
297}
298
299impl Access for RedisBackend {
300 type Reader = Buffer;
301 type Writer = RedisWriter;
302 type Lister = ();
303 type Deleter = oio::OneShotDeleter<RedisDeleter>;
304
305 fn info(&self) -> Arc<AccessorInfo> {
306 self.info.clone()
307 }
308
309 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
310 let p = build_abs_path(&self.root, path);
311
312 if p == build_abs_path(&self.root, "") {
313 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
314 } else {
315 let bs = self.core.get(&p).await?;
316 match bs {
317 Some(bs) => Ok(RpStat::new(
318 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
319 )),
320 None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
321 }
322 }
323 }
324
325 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
326 let p = build_abs_path(&self.root, path);
327
328 let range = args.range();
329 let buffer = if range.is_full() {
330 match self.core.get(&p).await? {
332 Some(bs) => bs,
333 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
334 }
335 } else {
336 let start = range.offset() as isize;
338 let end = match range.size() {
339 Some(size) => (range.offset() + size - 1) as isize,
340 None => -1, };
342
343 match self.core.get_range(&p, start, end).await? {
344 Some(bs) => bs,
345 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
346 }
347 };
348
349 Ok((RpRead::new(), buffer))
350 }
351
352 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
353 let p = build_abs_path(&self.root, path);
354 Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
355 }
356
357 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
358 Ok((
359 RpDelete::default(),
360 oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
361 ))
362 }
363}