opendal/services/compfs/
backend.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use crate::raw::oio::OneShotDeleter;
30use crate::raw::*;
31use crate::services::CompfsConfig;
32use crate::*;
33
34impl Configurator for CompfsConfig {
35 type Builder = CompfsBuilder;
36 fn into_builder(self) -> Self::Builder {
37 CompfsBuilder { config: self }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44 config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58}
59
60impl Builder for CompfsBuilder {
61 const SCHEME: Scheme = Scheme::Compfs;
62 type Config = CompfsConfig;
63
64 fn build(self) -> Result<impl Access> {
65 let root = match self.config.root {
66 Some(root) => Ok(root),
67 None => Err(Error::new(
68 ErrorKind::ConfigInvalid,
69 "root is not specified",
70 )),
71 }?;
72
73 if let Err(e) = std::fs::metadata(&root) {
75 if e.kind() == std::io::ErrorKind::NotFound {
76 std::fs::create_dir_all(&root).map_err(|e| {
77 Error::new(ErrorKind::Unexpected, "create root dir failed")
78 .with_operation("Builder::build")
79 .with_context("root", root.as_str())
80 .set_source(e)
81 })?;
82 }
83 }
84
85 let dispatcher = Dispatcher::new().map_err(|_| {
86 Error::new(
87 ErrorKind::Unexpected,
88 "failed to initiate compio dispatcher",
89 )
90 })?;
91 let core = CompfsCore {
92 info: {
93 let am = AccessorInfo::default();
94 am.set_scheme(Scheme::Compfs)
95 .set_root(&root)
96 .set_native_capability(Capability {
97 stat: true,
98 stat_has_last_modified: true,
99
100 read: true,
101
102 write: true,
103 write_can_empty: true,
104 write_can_multi: true,
105 create_dir: true,
106 delete: true,
107
108 list: true,
109
110 copy: true,
111 rename: true,
112
113 shared: true,
114
115 ..Default::default()
116 });
117
118 am.into()
119 },
120 root: root.into(),
121 dispatcher,
122 buf_pool: oio::PooledBuf::new(16),
123 };
124 Ok(CompfsBackend {
125 core: Arc::new(core),
126 })
127 }
128}
129
130#[derive(Debug)]
131pub struct CompfsBackend {
132 core: Arc<CompfsCore>,
133}
134
135impl Access for CompfsBackend {
136 type Reader = CompfsReader;
137 type Writer = CompfsWriter;
138 type Lister = Option<CompfsLister>;
139 type Deleter = OneShotDeleter<CompfsDeleter>;
140
141 fn info(&self) -> Arc<AccessorInfo> {
142 self.core.info.clone()
143 }
144
145 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
146 let path = self.core.prepare_path(path);
147
148 self.core
149 .exec(move || async move { compio::fs::create_dir_all(path).await })
150 .await?;
151
152 Ok(RpCreateDir::default())
153 }
154
155 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
156 let path = self.core.prepare_path(path);
157
158 let meta = self
159 .core
160 .exec(move || async move { compio::fs::metadata(path).await })
161 .await?;
162 let ty = meta.file_type();
163 let mode = if ty.is_dir() {
164 EntryMode::DIR
165 } else if ty.is_file() {
166 EntryMode::FILE
167 } else {
168 EntryMode::Unknown
169 };
170 let last_mod = meta.modified().map_err(new_std_io_error)?.into();
171 let ret = Metadata::new(mode)
172 .with_last_modified(last_mod)
173 .with_content_length(meta.len());
174
175 Ok(RpStat::new(ret))
176 }
177
178 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
179 Ok((
180 RpDelete::default(),
181 OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
182 ))
183 }
184
185 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
186 let from = self.core.prepare_path(from);
187 let to = self.core.prepare_path(to);
188
189 self.core
190 .exec(move || async move {
191 let from = OpenOptions::new().read(true).open(from).await?;
192 if let Some(parent) = to.parent() {
193 compio::fs::create_dir_all(parent).await?;
194 }
195 let to = OpenOptions::new()
196 .write(true)
197 .create(true)
198 .truncate(true)
199 .open(to)
200 .await?;
201
202 let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
203 compio::io::copy(&mut from, &mut to).await?;
204
205 Ok(())
206 })
207 .await?;
208
209 Ok(RpCopy::default())
210 }
211
212 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
213 let from = self.core.prepare_path(from);
214 let to = self.core.prepare_path(to);
215
216 self.core
217 .exec(move || async move {
218 if let Some(parent) = to.parent() {
219 compio::fs::create_dir_all(parent).await?;
220 }
221 compio::fs::rename(from, to).await
222 })
223 .await?;
224
225 Ok(RpRename::default())
226 }
227
228 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
229 let path = self.core.prepare_path(path);
230
231 let file = self
232 .core
233 .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
234 .await?;
235
236 let r = CompfsReader::new(self.core.clone(), file, op.range());
237 Ok((RpRead::new(), r))
238 }
239
240 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
241 let path = self.core.prepare_path(path);
242 let append = args.append();
243 let file = self
244 .core
245 .exec(move || async move {
246 if let Some(parent) = path.parent() {
247 compio::fs::create_dir_all(parent).await?;
248 }
249 let file = compio::fs::OpenOptions::new()
250 .create(true)
251 .write(true)
252 .truncate(!append)
253 .open(path)
254 .await?;
255 let mut file = Cursor::new(file);
256 if append {
257 let len = file.get_ref().metadata().await?.len();
258 file.set_position(len);
259 }
260 Ok(file)
261 })
262 .await?;
263
264 let w = CompfsWriter::new(self.core.clone(), file);
265 Ok((RpWrite::new(), w))
266 }
267
268 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
269 let path = self.core.prepare_path(path);
270
271 let read_dir = match self
272 .core
273 .exec_blocking({
274 let path = path.clone();
275 move || std::fs::read_dir(path)
276 })
277 .await?
278 {
279 Ok(rd) => rd,
280 Err(e) => {
281 return if e.kind() == std::io::ErrorKind::NotFound {
282 Ok((RpList::default(), None))
283 } else {
284 Err(new_std_io_error(e))
285 };
286 }
287 };
288
289 let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
290 Ok((RpList::default(), Some(lister)))
291 }
292}