opendal/services/memcached/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use super::MEMCACHED_SCHEME;
22use super::config::MemcachedConfig;
23use super::core::*;
24use super::deleter::MemcachedDeleter;
25use super::writer::MemcachedWriter;
26use crate::raw::*;
27use crate::*;
28
29/// [Memcached](https://memcached.org/) service support.
30#[doc = include_str!("docs.md")]
31#[derive(Debug, Default)]
32pub struct MemcachedBuilder {
33    pub(super) config: MemcachedConfig,
34}
35
36impl MemcachedBuilder {
37    /// set the network address of memcached service.
38    ///
39    /// For example: "tcp://localhost:11211"
40    pub fn endpoint(mut self, endpoint: &str) -> Self {
41        if !endpoint.is_empty() {
42            self.config.endpoint = Some(endpoint.to_owned());
43        }
44        self
45    }
46
47    /// set the working directory, all operations will be performed under it.
48    ///
49    /// default: "/"
50    pub fn root(mut self, root: &str) -> Self {
51        self.config.root = if root.is_empty() {
52            None
53        } else {
54            Some(root.to_string())
55        };
56
57        self
58    }
59
60    /// set the username.
61    pub fn username(mut self, username: &str) -> Self {
62        self.config.username = Some(username.to_string());
63        self
64    }
65
66    /// set the password.
67    pub fn password(mut self, password: &str) -> Self {
68        self.config.password = Some(password.to_string());
69        self
70    }
71
72    /// Set the default ttl for memcached services.
73    pub fn default_ttl(mut self, ttl: Duration) -> Self {
74        self.config.default_ttl = Some(ttl);
75        self
76    }
77
78    /// Sets the maximum number of connections managed by the pool.
79    ///
80    /// Defaults to 10.
81    ///
82    /// # Panics
83    ///
84    /// Will panic if `max_size` is 0.
85    #[must_use]
86    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
87        assert!(max_size > 0, "max_size must be greater than zero!");
88        self.config.connection_pool_max_size = Some(max_size);
89        self
90    }
91}
92
93impl Builder for MemcachedBuilder {
94    type Config = MemcachedConfig;
95
96    fn build(self) -> Result<impl Access> {
97        let endpoint = self.config.endpoint.clone().ok_or_else(|| {
98            Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
99                .with_context("service", MEMCACHED_SCHEME)
100        })?;
101        let uri = http::Uri::try_from(&endpoint).map_err(|err| {
102            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
103                .with_context("service", MEMCACHED_SCHEME)
104                .with_context("endpoint", &endpoint)
105                .set_source(err)
106        })?;
107
108        match uri.scheme_str() {
109            // If scheme is none, we will use tcp by default.
110            None => (),
111            Some(scheme) => {
112                // We only support tcp by now.
113                if scheme != "tcp" {
114                    return Err(Error::new(
115                        ErrorKind::ConfigInvalid,
116                        "endpoint is using invalid scheme",
117                    )
118                    .with_context("service", MEMCACHED_SCHEME)
119                    .with_context("endpoint", &endpoint)
120                    .with_context("scheme", scheme.to_string()));
121                }
122            }
123        };
124
125        let host = if let Some(host) = uri.host() {
126            host.to_string()
127        } else {
128            return Err(
129                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
130                    .with_context("service", MEMCACHED_SCHEME)
131                    .with_context("endpoint", &endpoint),
132            );
133        };
134        let port = if let Some(port) = uri.port_u16() {
135            port
136        } else {
137            return Err(
138                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
139                    .with_context("service", MEMCACHED_SCHEME)
140                    .with_context("endpoint", &endpoint),
141            );
142        };
143        let endpoint = format!("{host}:{port}",);
144
145        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
146
147        Ok(MemcachedBackend::new(MemcachedCore::new(
148            endpoint,
149            self.config.username,
150            self.config.password,
151            self.config.default_ttl,
152            self.config.connection_pool_max_size,
153        ))
154        .with_normalized_root(root))
155    }
156}
157
158/// Backend for memcached services.
159#[derive(Clone, Debug)]
160pub struct MemcachedBackend {
161    core: Arc<MemcachedCore>,
162    root: String,
163    info: Arc<AccessorInfo>,
164}
165
166impl MemcachedBackend {
167    pub fn new(core: MemcachedCore) -> Self {
168        let info = AccessorInfo::default();
169        info.set_scheme(MEMCACHED_SCHEME);
170        info.set_name("memcached");
171        info.set_root("/");
172        info.set_native_capability(Capability {
173            read: true,
174            stat: true,
175            write: true,
176            write_can_empty: true,
177            delete: true,
178            shared: true,
179            ..Default::default()
180        });
181
182        Self {
183            core: Arc::new(core),
184            root: "/".to_string(),
185            info: Arc::new(info),
186        }
187    }
188
189    fn with_normalized_root(mut self, root: String) -> Self {
190        self.info.set_root(&root);
191        self.root = root;
192        self
193    }
194}
195
196impl Access for MemcachedBackend {
197    type Reader = Buffer;
198    type Writer = MemcachedWriter;
199    type Lister = ();
200    type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
201
202    fn info(&self) -> Arc<AccessorInfo> {
203        self.info.clone()
204    }
205
206    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
207        let p = build_abs_path(&self.root, path);
208
209        if p == build_abs_path(&self.root, "") {
210            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
211        } else {
212            let bs = self.core.get(&p).await?;
213            match bs {
214                Some(bs) => Ok(RpStat::new(
215                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
216                )),
217                None => Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
218            }
219        }
220    }
221
222    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
223        let p = build_abs_path(&self.root, path);
224        let bs = match self.core.get(&p).await? {
225            Some(bs) => bs,
226            None => return Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
227        };
228        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
229    }
230
231    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
232        let p = build_abs_path(&self.root, path);
233        Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
234    }
235
236    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
237        Ok((
238            RpDelete::default(),
239            oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), self.root.clone())),
240        ))
241    }
242}