opendal/services/memcached/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use tokio::sync::OnceCell;
22
23use super::MEMCACHED_SCHEME;
24use super::config::MemcachedConfig;
25use super::core::*;
26use super::deleter::MemcachedDeleter;
27use super::writer::MemcachedWriter;
28use crate::raw::*;
29use crate::*;
30
31/// [Memcached](https://memcached.org/) service support.
32#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct MemcachedBuilder {
35    pub(super) config: MemcachedConfig,
36}
37
38impl MemcachedBuilder {
39    /// set the network address of memcached service.
40    ///
41    /// For example: "tcp://localhost:11211"
42    pub fn endpoint(mut self, endpoint: &str) -> Self {
43        if !endpoint.is_empty() {
44            self.config.endpoint = Some(endpoint.to_owned());
45        }
46        self
47    }
48
49    /// set the working directory, all operations will be performed under it.
50    ///
51    /// default: "/"
52    pub fn root(mut self, root: &str) -> Self {
53        self.config.root = if root.is_empty() {
54            None
55        } else {
56            Some(root.to_string())
57        };
58
59        self
60    }
61
62    /// set the username.
63    pub fn username(mut self, username: &str) -> Self {
64        self.config.username = Some(username.to_string());
65        self
66    }
67
68    /// set the password.
69    pub fn password(mut self, password: &str) -> Self {
70        self.config.password = Some(password.to_string());
71        self
72    }
73
74    /// Set the default ttl for memcached services.
75    pub fn default_ttl(mut self, ttl: Duration) -> Self {
76        self.config.default_ttl = Some(ttl);
77        self
78    }
79
80    /// Sets the maximum number of connections managed by the pool.
81    ///
82    /// Defaults to 10.
83    ///
84    /// # Panics
85    ///
86    /// Will panic if `max_size` is 0.
87    #[must_use]
88    pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
89        assert!(max_size > 0, "max_size must be greater than zero!");
90        self.config.connection_pool_max_size = Some(max_size);
91        self
92    }
93}
94
95impl Builder for MemcachedBuilder {
96    type Config = MemcachedConfig;
97
98    fn build(self) -> Result<impl Access> {
99        let endpoint = self.config.endpoint.clone().ok_or_else(|| {
100            Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
101                .with_context("service", MEMCACHED_SCHEME)
102        })?;
103        let uri = http::Uri::try_from(&endpoint).map_err(|err| {
104            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
105                .with_context("service", MEMCACHED_SCHEME)
106                .with_context("endpoint", &endpoint)
107                .set_source(err)
108        })?;
109
110        match uri.scheme_str() {
111            // If scheme is none, we will use tcp by default.
112            None => (),
113            Some(scheme) => {
114                // We only support tcp by now.
115                if scheme != "tcp" {
116                    return Err(Error::new(
117                        ErrorKind::ConfigInvalid,
118                        "endpoint is using invalid scheme",
119                    )
120                    .with_context("service", MEMCACHED_SCHEME)
121                    .with_context("endpoint", &endpoint)
122                    .with_context("scheme", scheme.to_string()));
123                }
124            }
125        };
126
127        let host = if let Some(host) = uri.host() {
128            host.to_string()
129        } else {
130            return Err(
131                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
132                    .with_context("service", MEMCACHED_SCHEME)
133                    .with_context("endpoint", &endpoint),
134            );
135        };
136        let port = if let Some(port) = uri.port_u16() {
137            port
138        } else {
139            return Err(
140                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
141                    .with_context("service", MEMCACHED_SCHEME)
142                    .with_context("endpoint", &endpoint),
143            );
144        };
145        let endpoint = format!("{host}:{port}",);
146
147        let root = normalize_root(
148            self.config
149                .root
150                .clone()
151                .unwrap_or_else(|| "/".to_string())
152                .as_str(),
153        );
154
155        let conn = OnceCell::new();
156        Ok(MemcachedBackend::new(MemcachedCore {
157            conn,
158            endpoint,
159            username: self.config.username.clone(),
160            password: self.config.password.clone(),
161            default_ttl: self.config.default_ttl,
162            connection_pool_max_size: self.config.connection_pool_max_size,
163        })
164        .with_normalized_root(root))
165    }
166}
167
168/// Backend for memcached services.
169#[derive(Clone, Debug)]
170pub struct MemcachedBackend {
171    core: Arc<MemcachedCore>,
172    root: String,
173    info: Arc<AccessorInfo>,
174}
175
176impl MemcachedBackend {
177    pub fn new(core: MemcachedCore) -> Self {
178        let info = AccessorInfo::default();
179        info.set_scheme(MEMCACHED_SCHEME);
180        info.set_name("memcached");
181        info.set_root("/");
182        info.set_native_capability(Capability {
183            read: true,
184            stat: true,
185            write: true,
186            write_can_empty: true,
187            delete: true,
188            shared: true,
189            ..Default::default()
190        });
191
192        Self {
193            core: Arc::new(core),
194            root: "/".to_string(),
195            info: Arc::new(info),
196        }
197    }
198
199    fn with_normalized_root(mut self, root: String) -> Self {
200        self.info.set_root(&root);
201        self.root = root;
202        self
203    }
204}
205
206impl Access for MemcachedBackend {
207    type Reader = Buffer;
208    type Writer = MemcachedWriter;
209    type Lister = ();
210    type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
211
212    fn info(&self) -> Arc<AccessorInfo> {
213        self.info.clone()
214    }
215
216    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
217        let p = build_abs_path(&self.root, path);
218
219        if p == build_abs_path(&self.root, "") {
220            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
221        } else {
222            let bs = self.core.get(&p).await?;
223            match bs {
224                Some(bs) => Ok(RpStat::new(
225                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
226                )),
227                None => Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
228            }
229        }
230    }
231
232    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
233        let p = build_abs_path(&self.root, path);
234        let bs = match self.core.get(&p).await? {
235            Some(bs) => bs,
236            None => return Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
237        };
238        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
239    }
240
241    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
242        let p = build_abs_path(&self.root, path);
243        Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
244    }
245
246    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
247        Ok((
248            RpDelete::default(),
249            oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), self.root.clone())),
250        ))
251    }
252}