1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::PathBuf;
21use std::time::Duration;
22
23use http::Uri;
24use redis::cluster::ClusterClientBuilder;
25use redis::Client;
26use redis::ConnectionAddr;
27use redis::ConnectionInfo;
28use redis::ProtocolVersion;
29use redis::RedisConnectionInfo;
30use tokio::sync::OnceCell;
31
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use super::DEFAULT_SCHEME;
36use crate::raw::oio;
37use crate::raw::*;
38use crate::services::RedisConfig;
39use crate::*;
40const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
41const DEFAULT_REDIS_PORT: u16 = 6379;
42
43impl Configurator for RedisConfig {
44 type Builder = RedisBuilder;
45 fn into_builder(self) -> Self::Builder {
46 RedisBuilder { config: self }
47 }
48}
49
50#[doc = include_str!("docs.md")]
52#[derive(Clone, Default)]
53pub struct RedisBuilder {
54 config: RedisConfig,
55}
56
57impl Debug for RedisBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 let mut d = f.debug_struct("RedisBuilder");
60
61 d.field("config", &self.config);
62 d.finish_non_exhaustive()
63 }
64}
65
66impl RedisBuilder {
67 pub fn endpoint(mut self, endpoint: &str) -> Self {
75 if !endpoint.is_empty() {
76 self.config.endpoint = Some(endpoint.to_owned());
77 }
78 self
79 }
80
81 pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
90 if !cluster_endpoints.is_empty() {
91 self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
92 }
93 self
94 }
95
96 pub fn username(mut self, username: &str) -> Self {
100 if !username.is_empty() {
101 self.config.username = Some(username.to_owned());
102 }
103 self
104 }
105
106 pub fn password(mut self, password: &str) -> Self {
110 if !password.is_empty() {
111 self.config.password = Some(password.to_owned());
112 }
113 self
114 }
115
116 pub fn db(mut self, db: i64) -> Self {
120 self.config.db = db;
121 self
122 }
123
124 pub fn default_ttl(mut self, ttl: Duration) -> Self {
128 self.config.default_ttl = Some(ttl);
129 self
130 }
131
132 pub fn root(mut self, root: &str) -> Self {
136 self.config.root = if root.is_empty() {
137 None
138 } else {
139 Some(root.to_string())
140 };
141
142 self
143 }
144}
145
146impl Builder for RedisBuilder {
147 type Config = RedisConfig;
148
149 fn build(self) -> Result<impl Access> {
150 let root = normalize_root(
151 self.config
152 .root
153 .clone()
154 .unwrap_or_else(|| "/".to_string())
155 .as_str(),
156 );
157
158 if let Some(endpoints) = self.config.cluster_endpoints.clone() {
159 let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
160 for endpoint in endpoints.split(',') {
161 cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
162 }
163 let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
164 if let Some(username) = &self.config.username {
165 client_builder = client_builder.username(username.clone());
166 }
167 if let Some(password) = &self.config.password {
168 client_builder = client_builder.password(password.clone());
169 }
170 let client = client_builder.build().map_err(format_redis_error)?;
171
172 let conn = OnceCell::new();
173
174 Ok(RedisAccessor::new(RedisCore {
175 addr: endpoints,
176 client: None,
177 cluster_client: Some(client),
178 conn,
179 default_ttl: self.config.default_ttl,
180 })
181 .with_normalized_root(root))
182 } else {
183 let endpoint = self
184 .config
185 .endpoint
186 .clone()
187 .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
188
189 let client =
190 Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
191 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
192 .with_context("service", Scheme::Redis)
193 .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
194 .with_context("db", self.config.db.to_string())
195 .set_source(e)
196 })?;
197
198 let conn = OnceCell::new();
199 Ok(RedisAccessor::new(RedisCore {
200 addr: endpoint,
201 client: Some(client),
202 cluster_client: None,
203 conn,
204 default_ttl: self.config.default_ttl,
205 })
206 .with_normalized_root(root))
207 }
208 }
209}
210
211impl RedisBuilder {
212 fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
213 let ep_url = endpoint.parse::<Uri>().map_err(|e| {
214 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
215 .with_context("service", Scheme::Redis)
216 .with_context("endpoint", endpoint)
217 .set_source(e)
218 })?;
219
220 let con_addr = match ep_url.scheme_str() {
221 Some("tcp") | Some("redis") | None => {
222 let host = ep_url
223 .host()
224 .map(|h| h.to_string())
225 .unwrap_or_else(|| "127.0.0.1".to_string());
226 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
227 ConnectionAddr::Tcp(host, port)
228 }
229 Some("rediss") => {
230 let host = ep_url
231 .host()
232 .map(|h| h.to_string())
233 .unwrap_or_else(|| "127.0.0.1".to_string());
234 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
235 ConnectionAddr::TcpTls {
236 host,
237 port,
238 insecure: false,
239 tls_params: None,
240 }
241 }
242 Some("unix") | Some("redis+unix") => {
243 let path = PathBuf::from(ep_url.path());
244 ConnectionAddr::Unix(path)
245 }
246 Some(s) => {
247 return Err(
248 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
249 .with_context("service", Scheme::Redis)
250 .with_context("scheme", s),
251 )
252 }
253 };
254
255 let redis_info = RedisConnectionInfo {
256 db: self.config.db,
257 username: self.config.username.clone(),
258 password: self.config.password.clone(),
259 protocol: ProtocolVersion::RESP2,
260 };
261
262 Ok(ConnectionInfo {
263 addr: con_addr,
264 redis: redis_info,
265 })
266 }
267}
268
269#[derive(Debug, Clone)]
271pub struct RedisAccessor {
272 core: std::sync::Arc<RedisCore>,
273 root: String,
274 info: std::sync::Arc<AccessorInfo>,
275}
276
277impl RedisAccessor {
278 fn new(core: RedisCore) -> Self {
279 let info = AccessorInfo::default();
280 info.set_scheme(DEFAULT_SCHEME);
281 info.set_name(&core.addr);
282 info.set_root("/");
283 info.set_native_capability(Capability {
284 read: true,
285 write: true,
286 delete: true,
287 stat: true,
288 write_can_empty: true,
289 shared: true,
290 ..Default::default()
291 });
292
293 Self {
294 core: std::sync::Arc::new(core),
295 root: "/".to_string(),
296 info: std::sync::Arc::new(info),
297 }
298 }
299
300 fn with_normalized_root(mut self, root: String) -> Self {
301 self.info.set_root(&root);
302 self.root = root;
303 self
304 }
305}
306
307impl Access for RedisAccessor {
308 type Reader = Buffer;
309 type Writer = RedisWriter;
310 type Lister = ();
311 type Deleter = oio::OneShotDeleter<RedisDeleter>;
312
313 fn info(&self) -> std::sync::Arc<AccessorInfo> {
314 self.info.clone()
315 }
316
317 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
318 let p = build_abs_path(&self.root, path);
319
320 if p == build_abs_path(&self.root, "") {
321 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
322 } else {
323 let bs = self.core.get(&p).await?;
324 match bs {
325 Some(bs) => Ok(RpStat::new(
326 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
327 )),
328 None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
329 }
330 }
331 }
332
333 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
334 let p = build_abs_path(&self.root, path);
335
336 let range = args.range();
337 let buffer = if range.is_full() {
338 match self.core.get(&p).await? {
340 Some(bs) => bs,
341 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
342 }
343 } else {
344 let start = range.offset() as isize;
346 let end = match range.size() {
347 Some(size) => (range.offset() + size - 1) as isize,
348 None => -1, };
350
351 match self.core.get_range(&p, start, end).await? {
352 Some(bs) => bs,
353 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
354 }
355 };
356
357 Ok((RpRead::new(), buffer))
358 }
359
360 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
361 let p = build_abs_path(&self.root, path);
362 Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
363 }
364
365 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
366 Ok((
367 RpDelete::default(),
368 oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
369 ))
370 }
371
372 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
373 let _ = build_abs_path(&self.root, path);
374 Ok((RpList::default(), ()))
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use std::time::Duration;
383
384 #[test]
385 fn test_redis_accessor_creation() {
386 let core = RedisCore {
387 addr: "redis://127.0.0.1:6379".to_string(),
388 client: None,
389 cluster_client: None,
390 conn: OnceCell::new(),
391 default_ttl: Some(Duration::from_secs(60)),
392 };
393
394 let accessor = RedisAccessor::new(core);
395
396 assert_eq!(accessor.root, "/");
398 assert_eq!(accessor.info.scheme(), "redis");
399 assert!(accessor.info.native_capability().read);
400 assert!(accessor.info.native_capability().write);
401 assert!(accessor.info.native_capability().delete);
402 assert!(accessor.info.native_capability().stat);
403 }
404
405 #[test]
406 fn test_redis_accessor_with_root() {
407 let core = RedisCore {
408 addr: "redis://127.0.0.1:6379".to_string(),
409 client: None,
410 cluster_client: None,
411 conn: OnceCell::new(),
412 default_ttl: None,
413 };
414
415 let accessor = RedisAccessor::new(core).with_normalized_root("/test/".to_string());
416
417 assert_eq!(accessor.root, "/test/");
418 assert_eq!(accessor.info.root(), "/test/".into());
419 }
420
421 #[test]
422 fn test_redis_builder_interface() {
423 let builder = RedisBuilder::default()
425 .endpoint("redis://localhost:6379")
426 .username("testuser")
427 .password("testpass")
428 .db(1)
429 .root("/test");
430
431 assert!(builder.config.endpoint.is_some());
433 assert_eq!(
434 builder.config.endpoint.as_ref().unwrap(),
435 "redis://localhost:6379"
436 );
437 assert_eq!(builder.config.username.as_ref().unwrap(), "testuser");
438 assert_eq!(builder.config.password.as_ref().unwrap(), "testpass");
439 assert_eq!(builder.config.db, 1);
440 assert_eq!(builder.config.root.as_ref().unwrap(), "/test");
441 }
442}