opendal/services/moka/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use log::debug;
24
25use super::core::*;
26use super::delete::MokaDeleter;
27use super::lister::MokaLister;
28use super::writer::MokaWriter;
29use super::DEFAULT_SCHEME;
30use crate::raw::oio;
31use crate::raw::*;
32use crate::services::MokaConfig;
33use crate::*;
34impl Configurator for MokaConfig {
35    type Builder = MokaBuilder;
36    fn into_builder(self) -> Self::Builder {
37        MokaBuilder {
38            config: self,
39            ..Default::default()
40        }
41    }
42}
43
44/// Type alias of [`moka::future::Cache`](https://docs.rs/moka/latest/moka/future/struct.Cache.html)
45pub type MokaCache<K, V> = moka::future::Cache<K, V>;
46/// Type alias of [`moka::future::CacheBuilder`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html)
47pub type MokaCacheBuilder<K, V> = moka::future::CacheBuilder<K, V, MokaCache<K, V>>;
48
49/// [moka](https://github.com/moka-rs/moka) backend support.
50#[doc = include_str!("docs.md")]
51#[derive(Default)]
52pub struct MokaBuilder {
53    config: MokaConfig,
54    builder: MokaCacheBuilder<String, MokaValue>,
55}
56
57impl Debug for MokaBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("MokaBuilder")
60            .field("config", &self.config)
61            .finish()
62    }
63}
64
65impl MokaBuilder {
66    /// Create a [`MokaBuilder`] with the given [`moka::future::CacheBuilder`].
67    ///
68    /// Refer to [`moka::future::CacheBuilder`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html)
69    ///
70    /// # Example
71    ///
72    /// ```no_run
73    /// # use std::sync::Arc;
74    /// # use std::time::Duration;
75    /// # use log::debug;
76    /// # use moka::notification::RemovalCause;
77    /// # use opendal::services::Moka;
78    /// # use opendal::services::MokaCacheBuilder;
79    /// # use opendal::services::MokaValue;
80    /// # use opendal::Configurator;
81    /// let moka = Moka::new(
82    ///     MokaCacheBuilder::<String, MokaValue>::default()
83    ///         .name("demo")
84    ///         .max_capacity(1000)
85    ///         .time_to_live(Duration::from_secs(300))
86    ///         .weigher(|k, v| (k.len() + v.content.len()) as u32)
87    ///         .eviction_listener(|k: Arc<String>, v: MokaValue, cause: RemovalCause| {
88    ///             debug!(
89    ///                 "moka cache eviction listener, key = {}, value = {:?}, cause = {:?}",
90    ///                 k.as_str(), v.content.to_vec(), cause
91    ///             );
92    ///         })
93    /// );
94    /// ```
95    pub fn new(builder: MokaCacheBuilder<String, MokaValue>) -> Self {
96        Self {
97            builder,
98            ..Default::default()
99        }
100    }
101
102    /// Sets the name of the cache.
103    ///
104    /// Refer to [`moka::future::CacheBuilder::name`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.name)
105    pub fn name(mut self, v: &str) -> Self {
106        if !v.is_empty() {
107            self.config.name = Some(v.to_owned());
108        }
109        self
110    }
111
112    /// Sets the max capacity of the cache.
113    ///
114    /// Refer to [`moka::future::CacheBuilder::max_capacity`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.max_capacity)
115    pub fn max_capacity(mut self, v: u64) -> Self {
116        if v != 0 {
117            self.config.max_capacity = Some(v);
118        }
119        self
120    }
121
122    /// Sets the time to live of the cache.
123    ///
124    /// Refer to [`moka::future::CacheBuilder::time_to_live`](https://docs.rs/moka/latest/moka/future/struct.CacheBuilder.html#method.time_to_live)
125    pub fn time_to_live(mut self, v: Duration) -> Self {
126        if !v.is_zero() {
127            self.config.time_to_live = Some(v);
128        }
129        self
130    }
131
132    /// Sets the time to idle of the cache.
133    ///
134    /// Refer to [`moka::future::CacheBuilder::time_to_idle`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.time_to_idle)
135    pub fn time_to_idle(mut self, v: Duration) -> Self {
136        if !v.is_zero() {
137            self.config.time_to_idle = Some(v);
138        }
139        self
140    }
141
142    /// Set the root path of this backend
143    pub fn root(mut self, path: &str) -> Self {
144        self.config.root = if path.is_empty() {
145            None
146        } else {
147            Some(path.to_string())
148        };
149        self
150    }
151}
152
153impl Builder for MokaBuilder {
154    type Config = MokaConfig;
155
156    fn build(self) -> Result<impl Access> {
157        debug!("backend build started: {:?}", &self);
158
159        let root = normalize_root(
160            self.config
161                .root
162                .clone()
163                .unwrap_or_else(|| "/".to_string())
164                .as_str(),
165        );
166
167        let mut builder = self.builder;
168
169        if let Some(v) = &self.config.name {
170            builder = builder.name(v);
171        }
172        if let Some(v) = self.config.max_capacity {
173            builder = builder.max_capacity(v);
174        }
175        if let Some(v) = self.config.time_to_live {
176            builder = builder.time_to_live(v);
177        }
178        if let Some(v) = self.config.time_to_idle {
179            builder = builder.time_to_idle(v);
180        }
181
182        debug!("backend build finished: {:?}", self.config);
183
184        let core = MokaCore {
185            cache: builder.build(),
186        };
187
188        Ok(MokaAccessor::new(core).with_normalized_root(root))
189    }
190}
191
192/// MokaAccessor implements Access trait directly
193#[derive(Debug, Clone)]
194pub struct MokaAccessor {
195    core: Arc<MokaCore>,
196    root: String,
197    info: Arc<AccessorInfo>,
198}
199
200impl MokaAccessor {
201    fn new(core: MokaCore) -> Self {
202        let info = AccessorInfo::default();
203        info.set_scheme(DEFAULT_SCHEME);
204        info.set_name(core.cache.name().unwrap_or("moka"));
205        info.set_root("/");
206        info.set_native_capability(Capability {
207            read: true,
208            write: true,
209            write_can_empty: true,
210            write_with_cache_control: true,
211            write_with_content_type: true,
212            write_with_content_disposition: true,
213            write_with_content_encoding: true,
214            delete: true,
215            stat: true,
216            list: true,
217            shared: false,
218            ..Default::default()
219        });
220
221        Self {
222            core: Arc::new(core),
223            root: "/".to_string(),
224            info: Arc::new(info),
225        }
226    }
227
228    fn with_normalized_root(mut self, root: String) -> Self {
229        self.info.set_root(&root);
230        self.root = root;
231        self
232    }
233}
234
235impl Access for MokaAccessor {
236    type Reader = Buffer;
237    type Writer = MokaWriter;
238    type Lister = oio::HierarchyLister<MokaLister>;
239    type Deleter = oio::OneShotDeleter<MokaDeleter>;
240
241    fn info(&self) -> Arc<AccessorInfo> {
242        self.info.clone()
243    }
244
245    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
246        let p = build_abs_path(&self.root, path);
247
248        if p == build_abs_path(&self.root, "") {
249            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
250        } else {
251            // Check the exact path first
252            match self.core.get(&p).await? {
253                Some(value) => {
254                    // Use the stored metadata but override mode if necessary
255                    let mut metadata = value.metadata.clone();
256                    // If path ends with '/' but we found a file, return DIR
257                    // This is because CompleteLayer's create_dir creates empty files with '/' suffix
258                    if p.ends_with('/') && metadata.mode() != EntryMode::DIR {
259                        metadata.set_mode(EntryMode::DIR);
260                    }
261                    Ok(RpStat::new(metadata))
262                }
263                None => {
264                    // If path ends with '/', check if there are any children
265                    if p.ends_with('/') {
266                        let has_children = self
267                            .core
268                            .cache
269                            .iter()
270                            .any(|kv| kv.0.starts_with(&p) && kv.0.len() > p.len());
271
272                        if has_children {
273                            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
274                        } else {
275                            Err(Error::new(ErrorKind::NotFound, "key not found in moka"))
276                        }
277                    } else {
278                        Err(Error::new(ErrorKind::NotFound, "key not found in moka"))
279                    }
280                }
281            }
282        }
283    }
284
285    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
286        let p = build_abs_path(&self.root, path);
287
288        match self.core.get(&p).await? {
289            Some(value) => {
290                let buffer = if args.range().is_full() {
291                    value.content
292                } else {
293                    let range = args.range();
294                    let start = range.offset() as usize;
295                    let end = match range.size() {
296                        Some(size) => (range.offset() + size) as usize,
297                        None => value.content.len(),
298                    };
299                    value.content.slice(start..end.min(value.content.len()))
300                };
301                Ok((RpRead::new(), buffer))
302            }
303            None => Err(Error::new(ErrorKind::NotFound, "key not found in moka")),
304        }
305    }
306
307    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
308        let p = build_abs_path(&self.root, path);
309        Ok((RpWrite::new(), MokaWriter::new(self.core.clone(), p, args)))
310    }
311
312    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
313        Ok((
314            RpDelete::default(),
315            oio::OneShotDeleter::new(MokaDeleter::new(self.core.clone(), self.root.clone())),
316        ))
317    }
318
319    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
320        // For moka, we don't distinguish between files and directories
321        // Just return the lister to iterate through all matching keys
322        let lister = MokaLister::new(self.core.clone(), self.root.clone(), path.to_string());
323        let lister = oio::HierarchyLister::new(lister, path, args.recursive());
324        Ok((RpList::default(), lister))
325    }
326}