opendal/services/moka/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::time::Duration;
21
22use log::debug;
23use moka::sync::CacheBuilder;
24use moka::sync::SegmentedCache;
25
26use crate::raw::adapters::typed_kv;
27use crate::raw::*;
28use crate::services::MokaConfig;
29use crate::*;
30
31impl Configurator for MokaConfig {
32    type Builder = MokaBuilder;
33    fn into_builder(self) -> Self::Builder {
34        MokaBuilder { config: self }
35    }
36}
37
38/// [moka](https://github.com/moka-rs/moka) backend support.
39#[doc = include_str!("docs.md")]
40#[derive(Default, Debug)]
41pub struct MokaBuilder {
42    config: MokaConfig,
43}
44
45impl MokaBuilder {
46    /// Name for this cache instance.
47    pub fn name(mut self, v: &str) -> Self {
48        if !v.is_empty() {
49            self.config.name = Some(v.to_owned());
50        }
51        self
52    }
53
54    /// Sets the max capacity of the cache.
55    ///
56    /// Refer to [`moka::sync::CacheBuilder::max_capacity`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.max_capacity)
57    pub fn max_capacity(mut self, v: u64) -> Self {
58        if v != 0 {
59            self.config.max_capacity = Some(v);
60        }
61        self
62    }
63
64    /// Sets the time to live of the cache.
65    ///
66    /// Refer to [`moka::sync::CacheBuilder::time_to_live`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.time_to_live)
67    pub fn time_to_live(mut self, v: Duration) -> Self {
68        if !v.is_zero() {
69            self.config.time_to_live = Some(v);
70        }
71        self
72    }
73
74    /// Sets the time to idle of the cache.
75    ///
76    /// Refer to [`moka::sync::CacheBuilder::time_to_idle`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.time_to_idle)
77    pub fn time_to_idle(mut self, v: Duration) -> Self {
78        if !v.is_zero() {
79            self.config.time_to_idle = Some(v);
80        }
81        self
82    }
83
84    /// Sets the segments number of the cache.
85    ///
86    /// Refer to [`moka::sync::CacheBuilder::segments`](https://docs.rs/moka/latest/moka/sync/struct.CacheBuilder.html#method.segments)
87    pub fn segments(mut self, v: usize) -> Self {
88        assert!(v != 0);
89        self.config.num_segments = Some(v);
90        self
91    }
92
93    /// Set root path of this backend
94    pub fn root(mut self, path: &str) -> Self {
95        self.config.root = if path.is_empty() {
96            None
97        } else {
98            Some(path.to_string())
99        };
100
101        self
102    }
103}
104
105impl Builder for MokaBuilder {
106    const SCHEME: Scheme = Scheme::Moka;
107    type Config = MokaConfig;
108
109    fn build(self) -> Result<impl Access> {
110        debug!("backend build started: {:?}", &self);
111
112        let mut builder: CacheBuilder<String, typed_kv::Value, _> =
113            SegmentedCache::builder(self.config.num_segments.unwrap_or(1));
114        // Use entries' bytes as capacity weigher.
115        builder = builder.weigher(|k, v| (k.len() + v.size()) as u32);
116        if let Some(v) = &self.config.name {
117            builder = builder.name(v);
118        }
119        if let Some(v) = self.config.max_capacity {
120            builder = builder.max_capacity(v)
121        }
122        if let Some(v) = self.config.time_to_live {
123            builder = builder.time_to_live(v)
124        }
125        if let Some(v) = self.config.time_to_idle {
126            builder = builder.time_to_idle(v)
127        }
128
129        debug!("backend build finished: {:?}", &self);
130
131        let mut backend = MokaBackend::new(Adapter {
132            inner: builder.build(),
133        });
134        if let Some(v) = self.config.root {
135            backend = backend.with_root(&v);
136        }
137
138        Ok(backend)
139    }
140}
141
142/// Backend is used to serve `Accessor` support in moka.
143pub type MokaBackend = typed_kv::Backend<Adapter>;
144
145#[derive(Clone)]
146pub struct Adapter {
147    inner: SegmentedCache<String, typed_kv::Value>,
148}
149
150impl Debug for Adapter {
151    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152        f.debug_struct("Adapter")
153            .field("size", &self.inner.weighted_size())
154            .field("count", &self.inner.entry_count())
155            .finish()
156    }
157}
158
159impl typed_kv::Adapter for Adapter {
160    fn info(&self) -> typed_kv::Info {
161        typed_kv::Info::new(
162            Scheme::Moka,
163            self.inner.name().unwrap_or("moka"),
164            typed_kv::Capability {
165                get: true,
166                set: true,
167                delete: true,
168                scan: true,
169                shared: false,
170            },
171        )
172    }
173
174    async fn get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
175        self.blocking_get(path)
176    }
177
178    fn blocking_get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
179        match self.inner.get(path) {
180            None => Ok(None),
181            Some(bs) => Ok(Some(bs)),
182        }
183    }
184
185    async fn set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
186        self.blocking_set(path, value)
187    }
188
189    fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
190        self.inner.insert(path.to_string(), value);
191
192        Ok(())
193    }
194
195    async fn delete(&self, path: &str) -> Result<()> {
196        self.blocking_delete(path)
197    }
198
199    fn blocking_delete(&self, path: &str) -> Result<()> {
200        self.inner.invalidate(path);
201
202        Ok(())
203    }
204
205    async fn scan(&self, path: &str) -> Result<Vec<String>> {
206        self.blocking_scan(path)
207    }
208
209    fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
210        let keys = self.inner.iter().map(|kv| kv.0.to_string());
211        if path.is_empty() {
212            Ok(keys.collect())
213        } else {
214            Ok(keys.filter(|k| k.starts_with(path)).collect())
215        }
216    }
217}