opendal/raw/
path_cache.rs1use std::collections::VecDeque;
19
20use futures::Future;
21use moka::sync::Cache;
22use tokio::sync::Mutex;
23use tokio::sync::MutexGuard;
24
25use crate::raw::*;
26use crate::*;
27
28pub trait PathQuery {
30 fn root(&self) -> impl Future<Output = Result<String>> + MaybeSend;
32 fn query(
34 &self,
35 parent_id: &str,
36 name: &str,
37 ) -> impl Future<Output = Result<Option<String>>> + MaybeSend;
38 fn create_dir(
40 &self,
41 parent_id: &str,
42 name: &str,
43 ) -> impl Future<Output = Result<String>> + MaybeSend;
44}
45
46pub struct PathCacher<Q: PathQuery> {
58 query: Q,
59 cache: Cache<String, String>,
60
61 lock: Option<Mutex<()>>,
66}
67
68impl<Q: PathQuery> PathCacher<Q> {
69 pub fn new(query: Q) -> Self {
71 Self {
72 query,
73 cache: Cache::new(64 * 1024),
74 lock: None,
75 }
76 }
77
78 pub fn with_lock(mut self) -> Self {
80 self.lock = Some(Mutex::default());
81 self
82 }
83
84 async fn lock(&self) -> Option<MutexGuard<()>> {
85 if let Some(l) = &self.lock {
86 Some(l.lock().await)
87 } else {
88 None
89 }
90 }
91
92 pub async fn insert(&self, path: &str, id: &str) {
94 let _guard = self.lock().await;
95
96 if self.cache.contains_key(path) {
98 debug_assert!(
99 self.cache.get(path) == Some(id.to_string()),
100 "path {path} exists but it's value is inconsistent"
101 );
102 return;
103 }
104
105 self.cache.insert(path.to_string(), id.to_string());
106 }
107
108 pub async fn remove(&self, path: &str) {
110 let _guard = self.lock().await;
111
112 self.cache.invalidate(path)
113 }
114
115 pub async fn get(&self, path: &str) -> Result<Option<String>> {
117 let _guard = self.lock().await;
118
119 if let Some(id) = self.cache.get(path) {
120 return Ok(Some(id));
121 }
122
123 let mut paths = VecDeque::new();
124 let mut current_path = path;
125
126 while current_path != "/" && !current_path.is_empty() {
127 paths.push_front(current_path.to_string());
128 current_path = get_parent(current_path);
129 if let Some(id) = self.cache.get(current_path) {
130 return self.query_down(&id, paths).await;
131 }
132 }
133
134 let root_id = self.query.root().await?;
135 self.cache.insert("/".to_string(), root_id.clone());
136 self.query_down(&root_id, paths).await
137 }
138
139 async fn query_down(&self, start_id: &str, paths: VecDeque<String>) -> Result<Option<String>> {
144 let mut current_id = start_id.to_string();
145 for path in paths.into_iter() {
146 let name = get_basename(&path);
147 current_id = match self.query.query(¤t_id, name).await? {
148 Some(id) => {
149 self.cache.insert(path, id.clone());
150 id
151 }
152 None => return Ok(None),
153 };
154 }
155 Ok(Some(current_id))
156 }
157
158 pub async fn ensure_dir(&self, path: &str) -> Result<String> {
160 let _guard = self.lock().await;
161
162 let mut tmp = "".to_string();
163 let mut parents = vec![];
165 for component in path.split('/') {
166 if component.is_empty() {
167 continue;
168 }
169
170 tmp.push_str(component);
171 tmp.push('/');
172 parents.push(tmp.to_string());
173 }
174
175 let mut parent_id = match self.cache.get("/") {
176 Some(v) => v,
177 None => self.query.root().await?,
178 };
179 for parent in parents {
180 parent_id = match self.cache.get(&parent) {
181 Some(value) => value,
182 None => {
183 let value = match self.query.query(&parent_id, get_basename(&parent)).await? {
184 Some(value) => value,
185 None => {
186 self.query
187 .create_dir(&parent_id, get_basename(&parent))
188 .await?
189 }
190 };
191 self.cache.insert(parent, value.clone());
192 value
193 }
194 }
195 }
196
197 Ok(parent_id)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203
204 use crate::raw::PathCacher;
205 use crate::raw::PathQuery;
206 use crate::*;
207
208 struct TestQuery {}
209
210 impl PathQuery for TestQuery {
211 async fn root(&self) -> Result<String> {
212 Ok("root/".to_string())
213 }
214
215 async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
216 if name.starts_with("not_exist") {
217 return Ok(None);
218 }
219 Ok(Some(format!("{parent_id}{name}")))
220 }
221
222 async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
223 Ok(format!("{parent_id}{name}"))
224 }
225 }
226
227 #[tokio::test]
228 async fn test_path_cacher_get() {
229 let cases = vec![
230 ("root", "/", Some("root/")),
231 ("normal path", "/a", Some("root/a")),
232 ("not exist normal dir", "/not_exist/a", None),
233 ("not exist normal file", "/a/b/not_exist", None),
234 ("nest path", "/a/b/c/d", Some("root/a/b/c/d")),
235 ];
236
237 for (name, input, expect) in cases {
238 let cache = PathCacher::new(TestQuery {});
239
240 let actual = cache.get(input).await.unwrap();
241 assert_eq!(actual.as_deref(), expect, "{}", name)
242 }
243 }
244}