opendal/services/compfs/
backend.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use super::DEFAULT_SCHEME;
30use crate::raw::oio::OneShotDeleter;
31use crate::raw::*;
32use crate::services::CompfsConfig;
33use crate::*;
34impl Configurator for CompfsConfig {
35 type Builder = CompfsBuilder;
36 fn into_builder(self) -> Self::Builder {
37 CompfsBuilder { config: self }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44 config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58}
59
60impl Builder for CompfsBuilder {
61 type Config = CompfsConfig;
62
63 fn build(self) -> Result<impl Access> {
64 let root = match self.config.root {
65 Some(root) => Ok(root),
66 None => Err(Error::new(
67 ErrorKind::ConfigInvalid,
68 "root is not specified",
69 )),
70 }?;
71
72 if let Err(e) = std::fs::metadata(&root) {
74 if e.kind() == std::io::ErrorKind::NotFound {
75 std::fs::create_dir_all(&root).map_err(|e| {
76 Error::new(ErrorKind::Unexpected, "create root dir failed")
77 .with_operation("Builder::build")
78 .with_context("root", root.as_str())
79 .set_source(e)
80 })?;
81 }
82 }
83
84 let dispatcher = Dispatcher::new().map_err(|_| {
85 Error::new(
86 ErrorKind::Unexpected,
87 "failed to initiate compio dispatcher",
88 )
89 })?;
90 let core = CompfsCore {
91 info: {
92 let am = AccessorInfo::default();
93 am.set_scheme(DEFAULT_SCHEME)
94 .set_root(&root)
95 .set_native_capability(Capability {
96 stat: true,
97
98 read: true,
99
100 write: true,
101 write_can_empty: true,
102 write_can_multi: true,
103 create_dir: true,
104 delete: true,
105
106 list: true,
107
108 copy: true,
109 rename: true,
110
111 shared: true,
112
113 ..Default::default()
114 });
115
116 am.into()
117 },
118 root: root.into(),
119 dispatcher,
120 buf_pool: oio::PooledBuf::new(16),
121 };
122 Ok(CompfsBackend {
123 core: Arc::new(core),
124 })
125 }
126}
127
128#[derive(Debug)]
129pub struct CompfsBackend {
130 core: Arc<CompfsCore>,
131}
132
133impl Access for CompfsBackend {
134 type Reader = CompfsReader;
135 type Writer = CompfsWriter;
136 type Lister = Option<CompfsLister>;
137 type Deleter = OneShotDeleter<CompfsDeleter>;
138
139 fn info(&self) -> Arc<AccessorInfo> {
140 self.core.info.clone()
141 }
142
143 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
144 let path = self.core.prepare_path(path);
145
146 self.core
147 .exec(move || async move { compio::fs::create_dir_all(path).await })
148 .await?;
149
150 Ok(RpCreateDir::default())
151 }
152
153 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
154 let path = self.core.prepare_path(path);
155
156 let meta = self
157 .core
158 .exec(move || async move { compio::fs::metadata(path).await })
159 .await?;
160 let ty = meta.file_type();
161 let mode = if ty.is_dir() {
162 EntryMode::DIR
163 } else if ty.is_file() {
164 EntryMode::FILE
165 } else {
166 EntryMode::Unknown
167 };
168 let last_mod = meta.modified().map_err(new_std_io_error)?.into();
169 let ret = Metadata::new(mode)
170 .with_last_modified(last_mod)
171 .with_content_length(meta.len());
172
173 Ok(RpStat::new(ret))
174 }
175
176 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
177 Ok((
178 RpDelete::default(),
179 OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
180 ))
181 }
182
183 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
184 let from = self.core.prepare_path(from);
185 let to = self.core.prepare_path(to);
186
187 self.core
188 .exec(move || async move {
189 let from = OpenOptions::new().read(true).open(from).await?;
190 if let Some(parent) = to.parent() {
191 compio::fs::create_dir_all(parent).await?;
192 }
193 let to = OpenOptions::new()
194 .write(true)
195 .create(true)
196 .truncate(true)
197 .open(to)
198 .await?;
199
200 let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
201 compio::io::copy(&mut from, &mut to).await?;
202
203 Ok(())
204 })
205 .await?;
206
207 Ok(RpCopy::default())
208 }
209
210 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
211 let from = self.core.prepare_path(from);
212 let to = self.core.prepare_path(to);
213
214 self.core
215 .exec(move || async move {
216 if let Some(parent) = to.parent() {
217 compio::fs::create_dir_all(parent).await?;
218 }
219 compio::fs::rename(from, to).await
220 })
221 .await?;
222
223 Ok(RpRename::default())
224 }
225
226 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
227 let path = self.core.prepare_path(path);
228
229 let file = self
230 .core
231 .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
232 .await?;
233
234 let r = CompfsReader::new(self.core.clone(), file, op.range());
235 Ok((RpRead::new(), r))
236 }
237
238 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
239 let path = self.core.prepare_path(path);
240 let append = args.append();
241 let file = self
242 .core
243 .exec(move || async move {
244 if let Some(parent) = path.parent() {
245 compio::fs::create_dir_all(parent).await?;
246 }
247 let file = compio::fs::OpenOptions::new()
248 .create(true)
249 .write(true)
250 .truncate(!append)
251 .open(path)
252 .await?;
253 let mut file = Cursor::new(file);
254 if append {
255 let len = file.get_ref().metadata().await?.len();
256 file.set_position(len);
257 }
258 Ok(file)
259 })
260 .await?;
261
262 let w = CompfsWriter::new(self.core.clone(), file);
263 Ok((RpWrite::new(), w))
264 }
265
266 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
267 let path = self.core.prepare_path(path);
268
269 let read_dir = match self
270 .core
271 .exec_blocking({
272 let path = path.clone();
273 move || std::fs::read_dir(path)
274 })
275 .await?
276 {
277 Ok(rd) => rd,
278 Err(e) => {
279 return if e.kind() == std::io::ErrorKind::NotFound {
280 Ok((RpList::default(), None))
281 } else {
282 Err(new_std_io_error(e))
283 };
284 }
285 };
286
287 let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
288 Ok((RpList::default(), Some(lister)))
289 }
290}