opendal_core/services/memcached/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::MEMCACHED_SCHEME;
21use super::config::MemcachedConfig;
22use super::core::*;
23use super::deleter::MemcachedDeleter;
24use super::writer::MemcachedWriter;
25use crate::raw::*;
26use crate::*;
27
28/// [Memcached](https://memcached.org/) service support.
29#[doc = include_str!("docs.md")]
30#[derive(Debug, Default)]
31pub struct MemcachedBuilder {
32    pub(super) config: MemcachedConfig,
33}
34
35impl MemcachedBuilder {
36    /// set the network address of memcached service.
37    ///
38    /// For example: "tcp://localhost:11211"
39    pub fn endpoint(mut self, endpoint: &str) -> Self {
40        if !endpoint.is_empty() {
41            self.config.endpoint = Some(endpoint.to_owned());
42        }
43        self
44    }
45
46    /// set the working directory, all operations will be performed under it.
47    ///
48    /// default: "/"
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58
59    /// set the username.
60    pub fn username(mut self, username: &str) -> Self {
61        self.config.username = Some(username.to_string());
62        self
63    }
64
65    /// set the password.
66    pub fn password(mut self, password: &str) -> Self {
67        self.config.password = Some(password.to_string());
68        self
69    }
70
71    /// Set the default ttl for memcached services.
72    pub fn default_ttl(mut self, ttl: Duration) -> Self {
73        self.config.default_ttl = Some(ttl);
74        self
75    }
76
77    /// Sets the maximum number of connections managed by the pool.
78    ///
79    /// Defaults to 10.
80    ///
81    /// # Panics
82    ///
83    /// Will panic if `max_size` is 0.
84    #[must_use]
85    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
86        assert!(max_size > 0, "max_size must be greater than zero!");
87        self.config.connection_pool_max_size = Some(max_size);
88        self
89    }
90}
91
92impl Builder for MemcachedBuilder {
93    type Config = MemcachedConfig;
94
95    fn build(self) -> Result<impl Access> {
96        let endpoint = self.config.endpoint.clone().ok_or_else(|| {
97            Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
98                .with_context("service", MEMCACHED_SCHEME)
99        })?;
100        let uri = http::Uri::try_from(&endpoint).map_err(|err| {
101            Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
102                .with_context("service", MEMCACHED_SCHEME)
103                .with_context("endpoint", &endpoint)
104                .set_source(err)
105        })?;
106
107        match uri.scheme_str() {
108            // If scheme is none, we will use tcp by default.
109            None => (),
110            Some(scheme) => {
111                // We only support tcp by now.
112                if scheme != "tcp" {
113                    return Err(Error::new(
114                        ErrorKind::ConfigInvalid,
115                        "endpoint is using invalid scheme",
116                    )
117                    .with_context("service", MEMCACHED_SCHEME)
118                    .with_context("endpoint", &endpoint)
119                    .with_context("scheme", scheme.to_string()));
120                }
121            }
122        };
123
124        let host = if let Some(host) = uri.host() {
125            host.to_string()
126        } else {
127            return Err(
128                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host")
129                    .with_context("service", MEMCACHED_SCHEME)
130                    .with_context("endpoint", &endpoint),
131            );
132        };
133        let port = if let Some(port) = uri.port_u16() {
134            port
135        } else {
136            return Err(
137                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port")
138                    .with_context("service", MEMCACHED_SCHEME)
139                    .with_context("endpoint", &endpoint),
140            );
141        };
142        let endpoint = format!("{host}:{port}",);
143
144        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
145
146        Ok(MemcachedBackend::new(MemcachedCore::new(
147            endpoint,
148            self.config.username,
149            self.config.password,
150            self.config.default_ttl,
151            self.config.connection_pool_max_size,
152        ))
153        .with_normalized_root(root))
154    }
155}
156
157/// Backend for memcached services.
158#[derive(Clone, Debug)]
159pub struct MemcachedBackend {
160    core: Arc<MemcachedCore>,
161    root: String,
162    info: Arc<AccessorInfo>,
163}
164
165impl MemcachedBackend {
166    pub fn new(core: MemcachedCore) -> Self {
167        let info = AccessorInfo::default();
168        info.set_scheme(MEMCACHED_SCHEME);
169        info.set_name("memcached");
170        info.set_root("/");
171        info.set_native_capability(Capability {
172            read: true,
173            stat: true,
174            write: true,
175            write_can_empty: true,
176            delete: true,
177            shared: true,
178            ..Default::default()
179        });
180
181        Self {
182            core: Arc::new(core),
183            root: "/".to_string(),
184            info: Arc::new(info),
185        }
186    }
187
188    fn with_normalized_root(mut self, root: String) -> Self {
189        self.info.set_root(&root);
190        self.root = root;
191        self
192    }
193}
194
195impl Access for MemcachedBackend {
196    type Reader = Buffer;
197    type Writer = MemcachedWriter;
198    type Lister = ();
199    type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
200
201    fn info(&self) -> Arc<AccessorInfo> {
202        self.info.clone()
203    }
204
205    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
206        let p = build_abs_path(&self.root, path);
207
208        if p == build_abs_path(&self.root, "") {
209            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
210        } else {
211            let bs = self.core.get(&p).await?;
212            match bs {
213                Some(bs) => Ok(RpStat::new(
214                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
215                )),
216                None => Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
217            }
218        }
219    }
220
221    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
222        let p = build_abs_path(&self.root, path);
223        let bs = match self.core.get(&p).await? {
224            Some(bs) => bs,
225            None => return Err(Error::new(ErrorKind::NotFound, "kv not found in memcached")),
226        };
227        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
228    }
229
230    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
231        let p = build_abs_path(&self.root, path);
232        Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
233    }
234
235    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
236        Ok((
237            RpDelete::default(),
238            oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), self.root.clone())),
239        ))
240    }
241}