opendal/services/compfs/
backend.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use crate::raw::oio::OneShotDeleter;
30use crate::raw::*;
31use crate::services::CompfsConfig;
32use crate::*;
33
34impl Configurator for CompfsConfig {
35 type Builder = CompfsBuilder;
36 fn into_builder(self) -> Self::Builder {
37 CompfsBuilder { config: self }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44 config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58}
59
60impl Builder for CompfsBuilder {
61 const SCHEME: Scheme = Scheme::Compfs;
62 type Config = CompfsConfig;
63
64 fn build(self) -> Result<impl Access> {
65 let root = match self.config.root {
66 Some(root) => Ok(root),
67 None => Err(Error::new(
68 ErrorKind::ConfigInvalid,
69 "root is not specified",
70 )),
71 }?;
72
73 if let Err(e) = std::fs::metadata(&root) {
75 if e.kind() == std::io::ErrorKind::NotFound {
76 std::fs::create_dir_all(&root).map_err(|e| {
77 Error::new(ErrorKind::Unexpected, "create root dir failed")
78 .with_operation("Builder::build")
79 .with_context("root", root.as_str())
80 .set_source(e)
81 })?;
82 }
83 }
84
85 let dispatcher = Dispatcher::new().map_err(|_| {
86 Error::new(
87 ErrorKind::Unexpected,
88 "failed to initiate compio dispatcher",
89 )
90 })?;
91 let core = CompfsCore {
92 info: {
93 let am = AccessorInfo::default();
94 am.set_scheme(Scheme::Compfs)
95 .set_root(&root)
96 .set_native_capability(Capability {
97 stat: true,
98 stat_has_last_modified: true,
99
100 read: true,
101
102 write: true,
103 write_can_empty: true,
104 write_can_multi: true,
105 create_dir: true,
106 delete: true,
107
108 list: true,
109
110 copy: true,
111 rename: true,
112
113 shared: true,
114
115 ..Default::default()
116 });
117
118 am.into()
119 },
120 root: root.into(),
121 dispatcher,
122 buf_pool: oio::PooledBuf::new(16),
123 };
124 Ok(CompfsBackend {
125 core: Arc::new(core),
126 })
127 }
128}
129
130#[derive(Debug)]
131pub struct CompfsBackend {
132 core: Arc<CompfsCore>,
133}
134
135impl Access for CompfsBackend {
136 type Reader = CompfsReader;
137 type Writer = CompfsWriter;
138 type Lister = Option<CompfsLister>;
139 type Deleter = OneShotDeleter<CompfsDeleter>;
140 type BlockingReader = ();
141 type BlockingWriter = ();
142 type BlockingLister = ();
143 type BlockingDeleter = ();
144
145 fn info(&self) -> Arc<AccessorInfo> {
146 self.core.info.clone()
147 }
148
149 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
150 let path = self.core.prepare_path(path);
151
152 self.core
153 .exec(move || async move { compio::fs::create_dir_all(path).await })
154 .await?;
155
156 Ok(RpCreateDir::default())
157 }
158
159 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
160 let path = self.core.prepare_path(path);
161
162 let meta = self
163 .core
164 .exec(move || async move { compio::fs::metadata(path).await })
165 .await?;
166 let ty = meta.file_type();
167 let mode = if ty.is_dir() {
168 EntryMode::DIR
169 } else if ty.is_file() {
170 EntryMode::FILE
171 } else {
172 EntryMode::Unknown
173 };
174 let last_mod = meta.modified().map_err(new_std_io_error)?.into();
175 let ret = Metadata::new(mode)
176 .with_last_modified(last_mod)
177 .with_content_length(meta.len());
178
179 Ok(RpStat::new(ret))
180 }
181
182 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
183 Ok((
184 RpDelete::default(),
185 OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
186 ))
187 }
188
189 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
190 let from = self.core.prepare_path(from);
191 let to = self.core.prepare_path(to);
192
193 self.core
194 .exec(move || async move {
195 let from = OpenOptions::new().read(true).open(from).await?;
196 if let Some(parent) = to.parent() {
197 compio::fs::create_dir_all(parent).await?;
198 }
199 let to = OpenOptions::new()
200 .write(true)
201 .create(true)
202 .truncate(true)
203 .open(to)
204 .await?;
205
206 let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
207 compio::io::copy(&mut from, &mut to).await?;
208
209 Ok(())
210 })
211 .await?;
212
213 Ok(RpCopy::default())
214 }
215
216 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
217 let from = self.core.prepare_path(from);
218 let to = self.core.prepare_path(to);
219
220 self.core
221 .exec(move || async move {
222 if let Some(parent) = to.parent() {
223 compio::fs::create_dir_all(parent).await?;
224 }
225 compio::fs::rename(from, to).await
226 })
227 .await?;
228
229 Ok(RpRename::default())
230 }
231
232 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
233 let path = self.core.prepare_path(path);
234
235 let file = self
236 .core
237 .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
238 .await?;
239
240 let r = CompfsReader::new(self.core.clone(), file, op.range());
241 Ok((RpRead::new(), r))
242 }
243
244 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
245 let path = self.core.prepare_path(path);
246 let append = args.append();
247 let file = self
248 .core
249 .exec(move || async move {
250 if let Some(parent) = path.parent() {
251 compio::fs::create_dir_all(parent).await?;
252 }
253 let file = compio::fs::OpenOptions::new()
254 .create(true)
255 .write(true)
256 .truncate(!append)
257 .open(path)
258 .await?;
259 let mut file = Cursor::new(file);
260 if append {
261 let len = file.get_ref().metadata().await?.len();
262 file.set_position(len);
263 }
264 Ok(file)
265 })
266 .await?;
267
268 let w = CompfsWriter::new(self.core.clone(), file);
269 Ok((RpWrite::new(), w))
270 }
271
272 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
273 let path = self.core.prepare_path(path);
274
275 let read_dir = match self
276 .core
277 .exec_blocking({
278 let path = path.clone();
279 move || std::fs::read_dir(path)
280 })
281 .await?
282 {
283 Ok(rd) => rd,
284 Err(e) => {
285 return if e.kind() == std::io::ErrorKind::NotFound {
286 Ok((RpList::default(), None))
287 } else {
288 Err(new_std_io_error(e))
289 };
290 }
291 };
292
293 let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
294 Ok((RpList::default(), Some(lister)))
295 }
296}