1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::PathBuf;
21use std::time::Duration;
22
23use http::Uri;
24use redis::cluster::ClusterClientBuilder;
25use redis::Client;
26use redis::ConnectionAddr;
27use redis::ConnectionInfo;
28use redis::ProtocolVersion;
29use redis::RedisConnectionInfo;
30use tokio::sync::OnceCell;
31
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use crate::raw::oio;
36use crate::raw::*;
37use crate::services::RedisConfig;
38use crate::*;
39
40const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
41const DEFAULT_REDIS_PORT: u16 = 6379;
42
43impl Configurator for RedisConfig {
44 type Builder = RedisBuilder;
45 fn into_builder(self) -> Self::Builder {
46 RedisBuilder { config: self }
47 }
48}
49
50#[doc = include_str!("docs.md")]
52#[derive(Clone, Default)]
53pub struct RedisBuilder {
54 config: RedisConfig,
55}
56
57impl Debug for RedisBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 let mut d = f.debug_struct("RedisBuilder");
60
61 d.field("config", &self.config);
62 d.finish_non_exhaustive()
63 }
64}
65
66impl RedisBuilder {
67 pub fn endpoint(mut self, endpoint: &str) -> Self {
75 if !endpoint.is_empty() {
76 self.config.endpoint = Some(endpoint.to_owned());
77 }
78 self
79 }
80
81 pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
90 if !cluster_endpoints.is_empty() {
91 self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
92 }
93 self
94 }
95
96 pub fn username(mut self, username: &str) -> Self {
100 if !username.is_empty() {
101 self.config.username = Some(username.to_owned());
102 }
103 self
104 }
105
106 pub fn password(mut self, password: &str) -> Self {
110 if !password.is_empty() {
111 self.config.password = Some(password.to_owned());
112 }
113 self
114 }
115
116 pub fn db(mut self, db: i64) -> Self {
120 self.config.db = db;
121 self
122 }
123
124 pub fn default_ttl(mut self, ttl: Duration) -> Self {
128 self.config.default_ttl = Some(ttl);
129 self
130 }
131
132 pub fn root(mut self, root: &str) -> Self {
136 self.config.root = if root.is_empty() {
137 None
138 } else {
139 Some(root.to_string())
140 };
141
142 self
143 }
144}
145
146impl Builder for RedisBuilder {
147 const SCHEME: Scheme = Scheme::Redis;
148 type Config = RedisConfig;
149
150 fn build(self) -> Result<impl Access> {
151 let root = normalize_root(
152 self.config
153 .root
154 .clone()
155 .unwrap_or_else(|| "/".to_string())
156 .as_str(),
157 );
158
159 if let Some(endpoints) = self.config.cluster_endpoints.clone() {
160 let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
161 for endpoint in endpoints.split(',') {
162 cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
163 }
164 let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
165 if let Some(username) = &self.config.username {
166 client_builder = client_builder.username(username.clone());
167 }
168 if let Some(password) = &self.config.password {
169 client_builder = client_builder.password(password.clone());
170 }
171 let client = client_builder.build().map_err(format_redis_error)?;
172
173 let conn = OnceCell::new();
174
175 Ok(RedisAccessor::new(RedisCore {
176 addr: endpoints,
177 client: None,
178 cluster_client: Some(client),
179 conn,
180 default_ttl: self.config.default_ttl,
181 })
182 .with_normalized_root(root))
183 } else {
184 let endpoint = self
185 .config
186 .endpoint
187 .clone()
188 .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
189
190 let client =
191 Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
192 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
193 .with_context("service", Scheme::Redis)
194 .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
195 .with_context("db", self.config.db.to_string())
196 .set_source(e)
197 })?;
198
199 let conn = OnceCell::new();
200 Ok(RedisAccessor::new(RedisCore {
201 addr: endpoint,
202 client: Some(client),
203 cluster_client: None,
204 conn,
205 default_ttl: self.config.default_ttl,
206 })
207 .with_normalized_root(root))
208 }
209 }
210}
211
212impl RedisBuilder {
213 fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
214 let ep_url = endpoint.parse::<Uri>().map_err(|e| {
215 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
216 .with_context("service", Scheme::Redis)
217 .with_context("endpoint", endpoint)
218 .set_source(e)
219 })?;
220
221 let con_addr = match ep_url.scheme_str() {
222 Some("tcp") | Some("redis") | None => {
223 let host = ep_url
224 .host()
225 .map(|h| h.to_string())
226 .unwrap_or_else(|| "127.0.0.1".to_string());
227 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228 ConnectionAddr::Tcp(host, port)
229 }
230 Some("rediss") => {
231 let host = ep_url
232 .host()
233 .map(|h| h.to_string())
234 .unwrap_or_else(|| "127.0.0.1".to_string());
235 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
236 ConnectionAddr::TcpTls {
237 host,
238 port,
239 insecure: false,
240 tls_params: None,
241 }
242 }
243 Some("unix") | Some("redis+unix") => {
244 let path = PathBuf::from(ep_url.path());
245 ConnectionAddr::Unix(path)
246 }
247 Some(s) => {
248 return Err(
249 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
250 .with_context("service", Scheme::Redis)
251 .with_context("scheme", s),
252 )
253 }
254 };
255
256 let redis_info = RedisConnectionInfo {
257 db: self.config.db,
258 username: self.config.username.clone(),
259 password: self.config.password.clone(),
260 protocol: ProtocolVersion::RESP2,
261 };
262
263 Ok(ConnectionInfo {
264 addr: con_addr,
265 redis: redis_info,
266 })
267 }
268}
269
270#[derive(Debug, Clone)]
272pub struct RedisAccessor {
273 core: std::sync::Arc<RedisCore>,
274 root: String,
275 info: std::sync::Arc<AccessorInfo>,
276}
277
278impl RedisAccessor {
279 fn new(core: RedisCore) -> Self {
280 let info = AccessorInfo::default();
281 info.set_scheme(Scheme::Redis);
282 info.set_name(&core.addr);
283 info.set_root("/");
284 info.set_native_capability(Capability {
285 read: true,
286 write: true,
287 delete: true,
288 stat: true,
289 write_can_empty: true,
290 shared: true,
291 ..Default::default()
292 });
293
294 Self {
295 core: std::sync::Arc::new(core),
296 root: "/".to_string(),
297 info: std::sync::Arc::new(info),
298 }
299 }
300
301 fn with_normalized_root(mut self, root: String) -> Self {
302 self.info.set_root(&root);
303 self.root = root;
304 self
305 }
306}
307
308impl Access for RedisAccessor {
309 type Reader = Buffer;
310 type Writer = RedisWriter;
311 type Lister = ();
312 type Deleter = oio::OneShotDeleter<RedisDeleter>;
313
314 fn info(&self) -> std::sync::Arc<AccessorInfo> {
315 self.info.clone()
316 }
317
318 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
319 let p = build_abs_path(&self.root, path);
320
321 if p == build_abs_path(&self.root, "") {
322 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
323 } else {
324 let bs = self.core.get(&p).await?;
325 match bs {
326 Some(bs) => Ok(RpStat::new(
327 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
328 )),
329 None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
330 }
331 }
332 }
333
334 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
335 let p = build_abs_path(&self.root, path);
336
337 let range = args.range();
338 let buffer = if range.is_full() {
339 match self.core.get(&p).await? {
341 Some(bs) => bs,
342 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
343 }
344 } else {
345 let start = range.offset() as isize;
347 let end = match range.size() {
348 Some(size) => (range.offset() + size - 1) as isize,
349 None => -1, };
351
352 match self.core.get_range(&p, start, end).await? {
353 Some(bs) => bs,
354 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
355 }
356 };
357
358 Ok((RpRead::new(), buffer))
359 }
360
361 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
362 let p = build_abs_path(&self.root, path);
363 Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
364 }
365
366 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
367 Ok((
368 RpDelete::default(),
369 oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
370 ))
371 }
372
373 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
374 let _ = build_abs_path(&self.root, path);
375 Ok((RpList::default(), ()))
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383 use std::time::Duration;
384
385 #[test]
386 fn test_redis_accessor_creation() {
387 let core = RedisCore {
388 addr: "redis://127.0.0.1:6379".to_string(),
389 client: None,
390 cluster_client: None,
391 conn: OnceCell::new(),
392 default_ttl: Some(Duration::from_secs(60)),
393 };
394
395 let accessor = RedisAccessor::new(core);
396
397 assert_eq!(accessor.root, "/");
399 assert_eq!(accessor.info.scheme(), Scheme::Redis);
400 assert!(accessor.info.native_capability().read);
401 assert!(accessor.info.native_capability().write);
402 assert!(accessor.info.native_capability().delete);
403 assert!(accessor.info.native_capability().stat);
404 }
405
406 #[test]
407 fn test_redis_accessor_with_root() {
408 let core = RedisCore {
409 addr: "redis://127.0.0.1:6379".to_string(),
410 client: None,
411 cluster_client: None,
412 conn: OnceCell::new(),
413 default_ttl: None,
414 };
415
416 let accessor = RedisAccessor::new(core).with_normalized_root("/test/".to_string());
417
418 assert_eq!(accessor.root, "/test/");
419 assert_eq!(accessor.info.root(), "/test/".into());
420 }
421
422 #[test]
423 fn test_redis_builder_interface() {
424 let builder = RedisBuilder::default()
426 .endpoint("redis://localhost:6379")
427 .username("testuser")
428 .password("testpass")
429 .db(1)
430 .root("/test");
431
432 assert!(builder.config.endpoint.is_some());
434 assert_eq!(
435 builder.config.endpoint.as_ref().unwrap(),
436 "redis://localhost:6379"
437 );
438 assert_eq!(builder.config.username.as_ref().unwrap(), "testuser");
439 assert_eq!(builder.config.password.as_ref().unwrap(), "testpass");
440 assert_eq!(builder.config.db, 1);
441 assert_eq!(builder.config.root.as_ref().unwrap(), "/test");
442 }
443}