opendal/services/moka/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20use std::time::Duration;
21
22use log::debug;
23
24use super::MOKA_SCHEME;
25use super::config::MokaConfig;
26use super::core::*;
27use super::deleter::MokaDeleter;
28use super::lister::MokaLister;
29use super::writer::MokaWriter;
30use crate::raw::oio;
31use crate::raw::signed_to_duration;
32use crate::raw::*;
33use crate::*;
34
35/// Type alias of [`moka::future::Cache`](https://docs.rs/moka/latest/moka/future/struct.Cache.html)
36pub type MokaCache<K, V> = moka::future::Cache<K, V>;
37/// Type alias of [`moka::future::CacheBuilder`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html)
38pub type MokaCacheBuilder<K, V> = moka::future::CacheBuilder<K, V, MokaCache<K, V>>;
39
40/// [moka](https://github.com/moka-rs/moka) backend support.
41#[doc = include_str!("docs.md")]
42#[derive(Default)]
43pub struct MokaBuilder {
44    pub(super) config: MokaConfig,
45    pub(super) builder: MokaCacheBuilder<String, MokaValue>,
46}
47
48impl Debug for MokaBuilder {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("MokaBuilder")
51            .field("config", &self.config)
52            .finish_non_exhaustive()
53    }
54}
55
56impl MokaBuilder {
57    /// Create a [`MokaBuilder`] with the given [`moka::future::CacheBuilder`].
58    ///
59    /// Refer to [`moka::future::CacheBuilder`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html)
60    ///
61    /// # Example
62    ///
63    /// ```no_run
64    /// # use std::sync::Arc;
65    /// # use std::time::Duration;
66    /// # use log::debug;
67    /// # use moka::notification::RemovalCause;
68    /// # use opendal::services::Moka;
69    /// # use opendal::services::MokaCacheBuilder;
70    /// # use opendal::services::MokaValue;
71    /// # use opendal::Configurator;
72    /// let moka = Moka::new(
73    ///     MokaCacheBuilder::<String, MokaValue>::default()
74    ///         .name("demo")
75    ///         .max_capacity(1000)
76    ///         .time_to_live(Duration::from_secs(300))
77    ///         .weigher(|k, v| (k.len() + v.content.len()) as u32)
78    ///         .eviction_listener(|k: Arc<String>, v: MokaValue, cause: RemovalCause| {
79    ///             debug!(
80    ///                 "moka cache eviction listener, key = {}, value = {:?}, cause = {:?}",
81    ///                 k.as_str(), v.content.to_vec(), cause
82    ///             );
83    ///         })
84    /// );
85    /// ```
86    pub fn new(builder: MokaCacheBuilder<String, MokaValue>) -> Self {
87        Self {
88            builder,
89            ..Default::default()
90        }
91    }
92
93    /// Sets the name of the cache.
94    ///
95    /// Refer to [`moka::future::CacheBuilder::name`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.name)
96    pub fn name(mut self, v: &str) -> Self {
97        if !v.is_empty() {
98            self.config.name = Some(v.to_owned());
99        }
100        self
101    }
102
103    /// Sets the max capacity of the cache.
104    ///
105    /// Refer to [`moka::future::CacheBuilder::max_capacity`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.max_capacity)
106    pub fn max_capacity(mut self, v: u64) -> Self {
107        if v != 0 {
108            self.config.max_capacity = Some(v);
109        }
110        self
111    }
112
113    /// Sets the time to live of the cache.
114    ///
115    /// Refer to [`moka::future::CacheBuilder::time_to_live`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.time_to_live)
116    pub fn time_to_live(mut self, v: Duration) -> Self {
117        if !v.is_zero() {
118            self.config.time_to_live = Some(format!("{}s", v.as_secs()));
119        }
120        self
121    }
122
123    /// Sets the time to idle of the cache.
124    ///
125    /// Refer to [`moka::future::CacheBuilder::time_to_idle`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.time_to_idle)
126    pub fn time_to_idle(mut self, v: Duration) -> Self {
127        if !v.is_zero() {
128            self.config.time_to_idle = Some(format!("{}s", v.as_secs()));
129        }
130        self
131    }
132
133    /// Set the root path of this backend
134    pub fn root(mut self, path: &str) -> Self {
135        self.config.root = if path.is_empty() {
136            None
137        } else {
138            Some(path.to_string())
139        };
140        self
141    }
142}
143
144impl Builder for MokaBuilder {
145    type Config = MokaConfig;
146
147    fn build(self) -> Result<impl Access> {
148        debug!("backend build started: {:?}", &self);
149
150        let root = normalize_root(
151            self.config
152                .root
153                .clone()
154                .unwrap_or_else(|| "/".to_string())
155                .as_str(),
156        );
157
158        let mut builder = self.builder;
159
160        if let Some(v) = &self.config.name {
161            builder = builder.name(v);
162        }
163        if let Some(v) = self.config.max_capacity {
164            builder = builder.max_capacity(v);
165        }
166        if let Some(value) = self.config.time_to_live.as_deref() {
167            let duration = signed_to_duration(value)?;
168            builder = builder.time_to_live(duration);
169        }
170        if let Some(value) = self.config.time_to_idle.as_deref() {
171            let duration = signed_to_duration(value)?;
172            builder = builder.time_to_idle(duration);
173        }
174
175        debug!("backend build finished: {:?}", self.config);
176
177        let core = MokaCore {
178            cache: builder.build(),
179        };
180
181        Ok(MokaBackend::new(core).with_normalized_root(root))
182    }
183}
184
185/// MokaBackend implements Access trait directly
186#[derive(Debug, Clone)]
187pub struct MokaBackend {
188    core: Arc<MokaCore>,
189    root: String,
190    info: Arc<AccessorInfo>,
191}
192
193impl MokaBackend {
194    fn new(core: MokaCore) -> Self {
195        let info = AccessorInfo::default();
196        info.set_scheme(MOKA_SCHEME);
197        info.set_name(core.cache.name().unwrap_or("moka"));
198        info.set_root("/");
199        info.set_native_capability(Capability {
200            read: true,
201            write: true,
202            write_can_empty: true,
203            write_with_cache_control: true,
204            write_with_content_type: true,
205            write_with_content_disposition: true,
206            write_with_content_encoding: true,
207            delete: true,
208            stat: true,
209            list: true,
210            shared: false,
211            ..Default::default()
212        });
213
214        Self {
215            core: Arc::new(core),
216            root: "/".to_string(),
217            info: Arc::new(info),
218        }
219    }
220
221    fn with_normalized_root(mut self, root: String) -> Self {
222        self.info.set_root(&root);
223        self.root = root;
224        self
225    }
226}
227
228impl Access for MokaBackend {
229    type Reader = Buffer;
230    type Writer = MokaWriter;
231    type Lister = oio::HierarchyLister<MokaLister>;
232    type Deleter = oio::OneShotDeleter<MokaDeleter>;
233
234    fn info(&self) -> Arc<AccessorInfo> {
235        self.info.clone()
236    }
237
238    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
239        let p = build_abs_path(&self.root, path);
240
241        if p == build_abs_path(&self.root, "") {
242            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
243        } else {
244            // Check the exact path first
245            match self.core.get(&p).await? {
246                Some(value) => {
247                    // Use the stored metadata but override mode if necessary
248                    let mut metadata = value.metadata.clone();
249                    // If path ends with '/' but we found a file, return DIR
250                    // This is because CompleteLayer's create_dir creates empty files with '/' suffix
251                    if p.ends_with('/') && metadata.mode() != EntryMode::DIR {
252                        metadata.set_mode(EntryMode::DIR);
253                    }
254                    Ok(RpStat::new(metadata))
255                }
256                None => {
257                    // If path ends with '/', check if there are any children
258                    if p.ends_with('/') {
259                        let has_children = self
260                            .core
261                            .cache
262                            .iter()
263                            .any(|kv| kv.0.starts_with(&p) && kv.0.len() > p.len());
264
265                        if has_children {
266                            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
267                        } else {
268                            Err(Error::new(ErrorKind::NotFound, "key not found in moka"))
269                        }
270                    } else {
271                        Err(Error::new(ErrorKind::NotFound, "key not found in moka"))
272                    }
273                }
274            }
275        }
276    }
277
278    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
279        let p = build_abs_path(&self.root, path);
280
281        match self.core.get(&p).await? {
282            Some(value) => {
283                let buffer = if args.range().is_full() {
284                    value.content
285                } else {
286                    let range = args.range();
287                    let start = range.offset() as usize;
288                    let end = match range.size() {
289                        Some(size) => (range.offset() + size) as usize,
290                        None => value.content.len(),
291                    };
292                    value.content.slice(start..end.min(value.content.len()))
293                };
294                Ok((RpRead::new(), buffer))
295            }
296            None => Err(Error::new(ErrorKind::NotFound, "key not found in moka")),
297        }
298    }
299
300    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
301        let p = build_abs_path(&self.root, path);
302        Ok((RpWrite::new(), MokaWriter::new(self.core.clone(), p, args)))
303    }
304
305    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
306        Ok((
307            RpDelete::default(),
308            oio::OneShotDeleter::new(MokaDeleter::new(self.core.clone(), self.root.clone())),
309        ))
310    }
311
312    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
313        // For moka, we don't distinguish between files and directories
314        // Just return the lister to iterate through all matching keys
315        let lister = MokaLister::new(self.core.clone(), self.root.clone(), path.to_string());
316        let lister = oio::HierarchyLister::new(lister, path, args.recursive());
317        Ok((RpList::default(), lister))
318    }
319}