1use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21
22use http::Uri;
23use redis::Client;
24use redis::ConnectionAddr;
25use redis::ConnectionInfo;
26use redis::ProtocolVersion;
27use redis::RedisConnectionInfo;
28use redis::cluster::ClusterClientBuilder;
29
30use super::REDIS_SCHEME;
31use super::config::RedisConfig;
32use super::core::*;
33use super::delete::RedisDeleter;
34use super::writer::RedisWriter;
35use crate::raw::*;
36use crate::*;
37
38const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
39const DEFAULT_REDIS_PORT: u16 = 6379;
40
41#[doc = include_str!("docs.md")]
43#[derive(Debug, Default)]
44pub struct RedisBuilder {
45 pub(super) config: RedisConfig,
46}
47
48impl RedisBuilder {
49 pub fn endpoint(mut self, endpoint: &str) -> Self {
57 if !endpoint.is_empty() {
58 self.config.endpoint = Some(endpoint.to_owned());
59 }
60 self
61 }
62
63 pub fn cluster_endpoints(mut self, cluster_endpoints: &str) -> Self {
72 if !cluster_endpoints.is_empty() {
73 self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
74 }
75 self
76 }
77
78 pub fn username(mut self, username: &str) -> Self {
82 if !username.is_empty() {
83 self.config.username = Some(username.to_owned());
84 }
85 self
86 }
87
88 pub fn password(mut self, password: &str) -> Self {
92 if !password.is_empty() {
93 self.config.password = Some(password.to_owned());
94 }
95 self
96 }
97
98 pub fn db(mut self, db: i64) -> Self {
102 self.config.db = db;
103 self
104 }
105
106 pub fn default_ttl(mut self, ttl: Duration) -> Self {
110 self.config.default_ttl = Some(ttl);
111 self
112 }
113
114 pub fn root(mut self, root: &str) -> Self {
118 self.config.root = if root.is_empty() {
119 None
120 } else {
121 Some(root.to_string())
122 };
123
124 self
125 }
126
127 #[must_use]
135 pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
136 assert!(max_size > 0, "max_size must be greater than zero!");
137 self.config.connection_pool_max_size = Some(max_size);
138 self
139 }
140}
141
142impl Builder for RedisBuilder {
143 type Config = RedisConfig;
144
145 fn build(self) -> Result<impl Access> {
146 let root = normalize_root(
147 self.config
148 .root
149 .clone()
150 .unwrap_or_else(|| "/".to_string())
151 .as_str(),
152 );
153
154 if let Some(endpoints) = self.config.cluster_endpoints.clone() {
155 let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
156 for endpoint in endpoints.split(',') {
157 cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
158 }
159 let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
160 if let Some(username) = &self.config.username {
161 client_builder = client_builder.username(username.clone());
162 }
163 if let Some(password) = &self.config.password {
164 client_builder = client_builder.password(password.clone());
165 }
166 let client = client_builder.build().map_err(format_redis_error)?;
167
168 Ok(RedisBackend::new(RedisCore::new(
169 endpoints,
170 None,
171 Some(client),
172 self.config.default_ttl,
173 self.config.connection_pool_max_size,
174 ))
175 .with_normalized_root(root))
176 } else {
177 let endpoint = self
178 .config
179 .endpoint
180 .clone()
181 .unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
182
183 let client =
184 Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
185 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
186 .with_context("service", REDIS_SCHEME)
187 .with_context("endpoint", self.config.endpoint.as_ref().unwrap())
188 .with_context("db", self.config.db.to_string())
189 .set_source(e)
190 })?;
191
192 Ok(RedisBackend::new(RedisCore::new(
193 endpoint,
194 Some(client),
195 None,
196 self.config.default_ttl,
197 self.config.connection_pool_max_size,
198 ))
199 .with_normalized_root(root))
200 }
201 }
202}
203
204impl RedisBuilder {
205 fn get_connection_info(&self, endpoint: String) -> Result<ConnectionInfo> {
206 let ep_url = endpoint.parse::<Uri>().map_err(|e| {
207 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
208 .with_context("service", REDIS_SCHEME)
209 .with_context("endpoint", endpoint)
210 .set_source(e)
211 })?;
212
213 let con_addr = match ep_url.scheme_str() {
214 Some("tcp") | Some("redis") | None => {
215 let host = ep_url
216 .host()
217 .map(|h| h.to_string())
218 .unwrap_or_else(|| "127.0.0.1".to_string());
219 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
220 ConnectionAddr::Tcp(host, port)
221 }
222 Some("rediss") => {
223 let host = ep_url
224 .host()
225 .map(|h| h.to_string())
226 .unwrap_or_else(|| "127.0.0.1".to_string());
227 let port = ep_url.port_u16().unwrap_or(DEFAULT_REDIS_PORT);
228 ConnectionAddr::TcpTls {
229 host,
230 port,
231 insecure: false,
232 tls_params: None,
233 }
234 }
235 Some("unix") | Some("redis+unix") => {
236 let path = PathBuf::from(ep_url.path());
237 ConnectionAddr::Unix(path)
238 }
239 Some(s) => {
240 return Err(
241 Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
242 .with_context("service", REDIS_SCHEME)
243 .with_context("scheme", s),
244 );
245 }
246 };
247
248 let redis_info = RedisConnectionInfo {
249 db: self.config.db,
250 username: self.config.username.clone(),
251 password: self.config.password.clone(),
252 protocol: ProtocolVersion::RESP2,
253 };
254
255 Ok(ConnectionInfo {
256 addr: con_addr,
257 redis: redis_info,
258 })
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct RedisBackend {
265 core: Arc<RedisCore>,
266 root: String,
267 info: Arc<AccessorInfo>,
268}
269
270impl RedisBackend {
271 fn new(core: RedisCore) -> Self {
272 let info = AccessorInfo::default();
273 info.set_scheme(REDIS_SCHEME);
274 info.set_name(core.addr());
275 info.set_root("/");
276 info.set_native_capability(Capability {
277 read: true,
278 write: true,
279 delete: true,
280 stat: true,
281 write_can_empty: true,
282 shared: true,
283 ..Default::default()
284 });
285
286 Self {
287 core: Arc::new(core),
288 root: "/".to_string(),
289 info: Arc::new(info),
290 }
291 }
292
293 fn with_normalized_root(mut self, root: String) -> Self {
294 self.info.set_root(&root);
295 self.root = root;
296 self
297 }
298}
299
300impl Access for RedisBackend {
301 type Reader = Buffer;
302 type Writer = RedisWriter;
303 type Lister = ();
304 type Deleter = oio::OneShotDeleter<RedisDeleter>;
305
306 fn info(&self) -> Arc<AccessorInfo> {
307 self.info.clone()
308 }
309
310 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
311 let p = build_abs_path(&self.root, path);
312
313 if p == build_abs_path(&self.root, "") {
314 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
315 } else {
316 let bs = self.core.get(&p).await?;
317 match bs {
318 Some(bs) => Ok(RpStat::new(
319 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
320 )),
321 None => Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
322 }
323 }
324 }
325
326 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
327 let p = build_abs_path(&self.root, path);
328
329 let range = args.range();
330 let buffer = if range.is_full() {
331 match self.core.get(&p).await? {
333 Some(bs) => bs,
334 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
335 }
336 } else {
337 let start = range.offset() as isize;
339 let end = match range.size() {
340 Some(size) => (range.offset() + size - 1) as isize,
341 None => -1, };
343
344 match self.core.get_range(&p, start, end).await? {
345 Some(bs) => bs,
346 None => return Err(Error::new(ErrorKind::NotFound, "key not found in redis")),
347 }
348 };
349
350 Ok((RpRead::new(), buffer))
351 }
352
353 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
354 let p = build_abs_path(&self.root, path);
355 Ok((RpWrite::new(), RedisWriter::new(self.core.clone(), p)))
356 }
357
358 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
359 Ok((
360 RpDelete::default(),
361 oio::OneShotDeleter::new(RedisDeleter::new(self.core.clone(), self.root.clone())),
362 ))
363 }
364}