opendal_core/services/mini_moka/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use log::debug;
21
22use super::MINI_MOKA_SCHEME;
23use super::config::MiniMokaConfig;
24use super::core::*;
25use super::deleter::MiniMokaDeleter;
26use super::lister::MiniMokaLister;
27use super::writer::MiniMokaWriter;
28use crate::raw::*;
29use crate::*;
30
31/// [mini-moka](https://github.com/moka-rs/mini-moka) backend support.
32#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct MiniMokaBuilder {
35    pub(super) config: MiniMokaConfig,
36}
37
38impl MiniMokaBuilder {
39    /// Create a [`MiniMokaBuilder`] with default configuration.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Sets the max capacity of the cache.
45    ///
46    /// Refer to [`mini-moka::sync::CacheBuilder::max_capacity`](https://docs.rs/mini-moka/latest/mini_moka/sync/struct.CacheBuilder.html#method.max_capacity)
47    pub fn max_capacity(mut self, v: u64) -> Self {
48        if v != 0 {
49            self.config.max_capacity = Some(v);
50        }
51        self
52    }
53
54    /// Sets the time to live of the cache.
55    ///
56    /// Refer to [`mini-moka::sync::CacheBuilder::time_to_live`](https://docs.rs/mini-moka/latest/mini_moka/sync/struct.CacheBuilder.html#method.time_to_live)
57    pub fn time_to_live(mut self, v: Duration) -> Self {
58        if !v.is_zero() {
59            self.config.time_to_live = Some(format!("{}s", v.as_secs()));
60        }
61        self
62    }
63
64    /// Sets the time to idle of the cache.
65    ///
66    /// Refer to [`mini-moka::sync::CacheBuilder::time_to_idle`](https://docs.rs/mini-moka/latest/mini_moka/sync/struct.CacheBuilder.html#method.time_to_idle)
67    pub fn time_to_idle(mut self, v: Duration) -> Self {
68        if !v.is_zero() {
69            self.config.time_to_idle = Some(format!("{}s", v.as_secs()));
70        }
71        self
72    }
73
74    /// Set root path of this backend
75    pub fn root(mut self, path: &str) -> Self {
76        self.config.root = if path.is_empty() {
77            None
78        } else {
79            Some(path.to_string())
80        };
81
82        self
83    }
84}
85
86impl Builder for MiniMokaBuilder {
87    type Config = MiniMokaConfig;
88
89    fn build(self) -> Result<impl Access> {
90        debug!("backend build started: {:?}", &self);
91
92        let mut builder: mini_moka::sync::CacheBuilder<String, MiniMokaValue, _> =
93            mini_moka::sync::Cache::builder();
94
95        // Use entries' bytes as capacity weigher.
96        builder = builder.weigher(|k, v| (k.len() + v.content.len()) as u32);
97
98        if let Some(v) = self.config.max_capacity {
99            builder = builder.max_capacity(v);
100        }
101        if let Some(value) = self.config.time_to_live.as_deref() {
102            let duration = signed_to_duration(value)?;
103            builder = builder.time_to_live(duration);
104        }
105        if let Some(value) = self.config.time_to_idle.as_deref() {
106            let duration = signed_to_duration(value)?;
107            builder = builder.time_to_idle(duration);
108        }
109
110        let cache = builder.build();
111
112        let root = normalize_root(self.config.root.as_deref().unwrap_or("/"));
113
114        let core = Arc::new(MiniMokaCore { cache });
115
116        debug!("backend build finished: {root}");
117        Ok(MiniMokaBackend::new(core, root))
118    }
119}
120
121#[derive(Debug)]
122struct MiniMokaBackend {
123    core: Arc<MiniMokaCore>,
124    root: String,
125}
126
127impl MiniMokaBackend {
128    fn new(core: Arc<MiniMokaCore>, root: String) -> Self {
129        Self { core, root }
130    }
131}
132
133impl Access for MiniMokaBackend {
134    type Reader = Buffer;
135    type Writer = MiniMokaWriter;
136    type Lister = oio::HierarchyLister<MiniMokaLister>;
137    type Deleter = oio::OneShotDeleter<MiniMokaDeleter>;
138
139    fn info(&self) -> Arc<AccessorInfo> {
140        let info = AccessorInfo::default();
141        info.set_scheme(MINI_MOKA_SCHEME)
142            .set_root(&self.root)
143            .set_native_capability(Capability {
144                stat: true,
145                read: true,
146                write: true,
147                write_can_empty: true,
148                delete: true,
149                list: true,
150
151                ..Default::default()
152            });
153
154        Arc::new(info)
155    }
156
157    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
158        let p = build_abs_path(&self.root, path);
159
160        // Check if path exists directly in cache
161        match self.core.get(&p) {
162            Some(value) => {
163                let mut metadata = value.metadata.clone();
164                if p.ends_with('/') {
165                    metadata.set_mode(EntryMode::DIR);
166                } else {
167                    metadata.set_mode(EntryMode::FILE);
168                }
169                Ok(RpStat::new(metadata))
170            }
171            None => {
172                if p.ends_with('/') {
173                    let is_prefix = self
174                        .core
175                        .cache
176                        .iter()
177                        .any(|entry| entry.key().starts_with(&p) && entry.key() != &p);
178
179                    if is_prefix {
180                        let mut metadata = Metadata::default();
181                        metadata.set_mode(EntryMode::DIR);
182                        return Ok(RpStat::new(metadata));
183                    }
184                }
185
186                Err(Error::new(ErrorKind::NotFound, "path not found"))
187            }
188        }
189    }
190
191    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
192        let p = build_abs_path(&self.root, path);
193
194        match self.core.get(&p) {
195            Some(value) => {
196                let range = op.range();
197
198                // If range is full, return the content buffer directly
199                if range.is_full() {
200                    return Ok((RpRead::new(), value.content));
201                }
202
203                let offset = range.offset() as usize;
204                if offset >= value.content.len() {
205                    return Err(Error::new(
206                        ErrorKind::RangeNotSatisfied,
207                        "range start offset exceeds content length",
208                    ));
209                }
210
211                let size = range.size().map(|s| s as usize);
212                let end = size.map_or(value.content.len(), |s| {
213                    (offset + s).min(value.content.len())
214                });
215                let sliced_content = value.content.slice(offset..end);
216
217                Ok((RpRead::new(), sliced_content))
218            }
219            None => Err(Error::new(ErrorKind::NotFound, "path not found")),
220        }
221    }
222
223    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
224        let p = build_abs_path(&self.root, path);
225        let writer = MiniMokaWriter::new(self.core.clone(), p, op);
226        Ok((RpWrite::new(), writer))
227    }
228
229    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
230        let deleter =
231            oio::OneShotDeleter::new(MiniMokaDeleter::new(self.core.clone(), self.root.clone()));
232        Ok((RpDelete::default(), deleter))
233    }
234
235    async fn list(&self, path: &str, op: OpList) -> Result<(RpList, Self::Lister)> {
236        let p = build_abs_path(&self.root, path);
237
238        let mini_moka_lister = MiniMokaLister::new(self.core.clone(), self.root.clone(), p);
239        let lister = oio::HierarchyLister::new(mini_moka_lister, path, op.recursive());
240
241        Ok((RpList::default(), lister))
242    }
243}