opendal/services/moka/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use log::debug;
24
25use super::core::*;
26use super::delete::MokaDeleter;
27use super::lister::MokaLister;
28use super::writer::MokaWriter;
29use crate::raw::oio;
30use crate::raw::*;
31use crate::services::MokaConfig;
32use crate::*;
33
34impl Configurator for MokaConfig {
35    type Builder = MokaBuilder;
36    fn into_builder(self) -> Self::Builder {
37        MokaBuilder {
38            config: self,
39            ..Default::default()
40        }
41    }
42}
43
44/// Type alias of [`moka::future::Cache`](https://docs.rs/moka/latest/moka/future/struct.Cache.html)
45pub type MokaCache<K, V> = moka::future::Cache<K, V>;
46/// Type alias of [`moka::future::CacheBuilder`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html)
47pub type MokaCacheBuilder<K, V> = moka::future::CacheBuilder<K, V, MokaCache<K, V>>;
48
49/// [moka](https://github.com/moka-rs/moka) backend support.
50#[doc = include_str!("docs.md")]
51#[derive(Default)]
52pub struct MokaBuilder {
53    config: MokaConfig,
54    builder: MokaCacheBuilder<String, MokaValue>,
55}
56
57impl Debug for MokaBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("MokaBuilder")
60            .field("config", &self.config)
61            .finish()
62    }
63}
64
65impl MokaBuilder {
66    /// Create a [`MokaBuilder`] with the given [`moka::future::CacheBuilder`].
67    ///
68    /// Refer to [`moka::future::CacheBuilder`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html)
69    ///
70    /// # Example
71    ///
72    /// ```no_run
73    /// # use std::sync::Arc;
74    /// # use std::time::Duration;
75    /// # use log::debug;
76    /// # use moka::notification::RemovalCause;
77    /// # use opendal::services::Moka;
78    /// # use opendal::services::MokaCacheBuilder;
79    /// # use opendal::services::MokaValue;
80    /// # use opendal::Configurator;
81    /// let moka = Moka::new(
82    ///     MokaCacheBuilder::<String, MokaValue>::default()
83    ///         .name("demo")
84    ///         .max_capacity(1000)
85    ///         .time_to_live(Duration::from_secs(300))
86    ///         .weigher(|k, v| (k.len() + v.content.len()) as u32)
87    ///         .eviction_listener(|k: Arc<String>, v: MokaValue, cause: RemovalCause| {
88    ///             debug!(
89    ///                 "moka cache eviction listener, key = {}, value = {:?}, cause = {:?}",
90    ///                 k.as_str(), v.content.to_vec(), cause
91    ///             );
92    ///         })
93    /// );
94    /// ```
95    pub fn new(builder: MokaCacheBuilder<String, MokaValue>) -> Self {
96        Self {
97            builder,
98            ..Default::default()
99        }
100    }
101
102    /// Sets the name of the cache.
103    ///
104    /// Refer to [`moka::future::CacheBuilder::name`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.name)
105    pub fn name(mut self, v: &str) -> Self {
106        if !v.is_empty() {
107            self.config.name = Some(v.to_owned());
108        }
109        self
110    }
111
112    /// Sets the max capacity of the cache.
113    ///
114    /// Refer to [`moka::future::CacheBuilder::max_capacity`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.max_capacity)
115    pub fn max_capacity(mut self, v: u64) -> Self {
116        if v != 0 {
117            self.config.max_capacity = Some(v);
118        }
119        self
120    }
121
122    /// Sets the time to live of the cache.
123    ///
124    /// Refer to [`moka::future::CacheBuilder::time_to_live`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.time_to_live)
125    pub fn time_to_live(mut self, v: Duration) -> Self {
126        if !v.is_zero() {
127            self.config.time_to_live = Some(v);
128        }
129        self
130    }
131
132    /// Sets the time to idle of the cache.
133    ///
134    /// Refer to [`moka::future::CacheBuilder::time_to_idle`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.time_to_idle)
135    pub fn time_to_idle(mut self, v: Duration) -> Self {
136        if !v.is_zero() {
137            self.config.time_to_idle = Some(v);
138        }
139        self
140    }
141
142    /// Set the root path of this backend
143    pub fn root(mut self, path: &str) -> Self {
144        self.config.root = if path.is_empty() {
145            None
146        } else {
147            Some(path.to_string())
148        };
149        self
150    }
151}
152
153impl Builder for MokaBuilder {
154    const SCHEME: Scheme = Scheme::Moka;
155    type Config = MokaConfig;
156
157    fn build(self) -> Result<impl Access> {
158        debug!("backend build started: {:?}", &self);
159
160        let root = normalize_root(
161            self.config
162                .root
163                .clone()
164                .unwrap_or_else(|| "/".to_string())
165                .as_str(),
166        );
167
168        let mut builder = self.builder;
169
170        // Use entries' bytes as capacity weigher.
171        builder = builder.weigher(|k, v| (k.len() + v.content.len()) as u32);
172        if let Some(v) = &self.config.name {
173            builder = builder.name(v);
174        }
175        if let Some(v) = self.config.max_capacity {
176            builder = builder.max_capacity(v);
177        }
178        if let Some(v) = self.config.time_to_live {
179            builder = builder.time_to_live(v);
180        }
181        if let Some(v) = self.config.time_to_idle {
182            builder = builder.time_to_idle(v);
183        }
184
185        debug!("backend build finished: {:?}", self.config);
186
187        let core = MokaCore {
188            cache: builder.build(),
189        };
190
191        Ok(MokaAccessor::new(core).with_normalized_root(root))
192    }
193}
194
195/// MokaAccessor implements Access trait directly
196#[derive(Debug, Clone)]
197pub struct MokaAccessor {
198    core: Arc<MokaCore>,
199    root: String,
200    info: Arc<AccessorInfo>,
201}
202
203impl MokaAccessor {
204    fn new(core: MokaCore) -> Self {
205        let info = AccessorInfo::default();
206        info.set_scheme(Scheme::Moka);
207        info.set_name(core.cache.name().unwrap_or("moka"));
208        info.set_root("/");
209        info.set_native_capability(Capability {
210            read: true,
211            write: true,
212            write_can_empty: true,
213            write_with_cache_control: true,
214            write_with_content_type: true,
215            write_with_content_disposition: true,
216            write_with_content_encoding: true,
217            delete: true,
218            stat: true,
219            list: true,
220            shared: false,
221            ..Default::default()
222        });
223
224        Self {
225            core: Arc::new(core),
226            root: "/".to_string(),
227            info: Arc::new(info),
228        }
229    }
230
231    fn with_normalized_root(mut self, root: String) -> Self {
232        self.info.set_root(&root);
233        self.root = root;
234        self
235    }
236}
237
238impl Access for MokaAccessor {
239    type Reader = Buffer;
240    type Writer = MokaWriter;
241    type Lister = oio::HierarchyLister<MokaLister>;
242    type Deleter = oio::OneShotDeleter<MokaDeleter>;
243
244    fn info(&self) -> Arc<AccessorInfo> {
245        self.info.clone()
246    }
247
248    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
249        let p = build_abs_path(&self.root, path);
250
251        if p == build_abs_path(&self.root, "") {
252            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
253        } else {
254            // Check the exact path first
255            match self.core.get(&p).await? {
256                Some(value) => {
257                    // Use the stored metadata but override mode if necessary
258                    let mut metadata = value.metadata.clone();
259                    // If path ends with '/' but we found a file, return DIR
260                    // This is because CompleteLayer's create_dir creates empty files with '/' suffix
261                    if p.ends_with('/') && metadata.mode() != EntryMode::DIR {
262                        metadata.set_mode(EntryMode::DIR);
263                    }
264                    Ok(RpStat::new(metadata))
265                }
266                None => {
267                    // If path ends with '/', check if there are any children
268                    if p.ends_with('/') {
269                        let has_children = self
270                            .core
271                            .cache
272                            .iter()
273                            .any(|kv| kv.0.starts_with(&p) && kv.0.len() > p.len());
274
275                        if has_children {
276                            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
277                        } else {
278                            Err(Error::new(ErrorKind::NotFound, "key not found in moka"))
279                        }
280                    } else {
281                        Err(Error::new(ErrorKind::NotFound, "key not found in moka"))
282                    }
283                }
284            }
285        }
286    }
287
288    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
289        let p = build_abs_path(&self.root, path);
290
291        match self.core.get(&p).await? {
292            Some(value) => {
293                let buffer = if args.range().is_full() {
294                    value.content
295                } else {
296                    let range = args.range();
297                    let start = range.offset() as usize;
298                    let end = match range.size() {
299                        Some(size) => (range.offset() + size) as usize,
300                        None => value.content.len(),
301                    };
302                    value.content.slice(start..end.min(value.content.len()))
303                };
304                Ok((RpRead::new(), buffer))
305            }
306            None => Err(Error::new(ErrorKind::NotFound, "key not found in moka")),
307        }
308    }
309
310    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
311        let p = build_abs_path(&self.root, path);
312        Ok((RpWrite::new(), MokaWriter::new(self.core.clone(), p, args)))
313    }
314
315    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
316        Ok((
317            RpDelete::default(),
318            oio::OneShotDeleter::new(MokaDeleter::new(self.core.clone(), self.root.clone())),
319        ))
320    }
321
322    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
323        // For moka, we don't distinguish between files and directories
324        // Just return the lister to iterate through all matching keys
325        let lister = MokaLister::new(self.core.clone(), self.root.clone(), path.to_string());
326        let lister = oio::HierarchyLister::new(lister, path, args.recursive());
327        Ok((RpList::default(), lister))
328    }
329}