opendal/services/compfs/
backend.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::COMPFS_SCHEME;
25use super::config::CompfsConfig;
26use super::core::CompfsCore;
27use super::deleter::CompfsDeleter;
28use super::lister::CompfsLister;
29use super::reader::CompfsReader;
30use super::writer::CompfsWriter;
31use crate::raw::*;
32use crate::*;
33
34#[derive(Debug, Default)]
36pub struct CompfsBuilder {
37 pub(super) config: CompfsConfig,
38}
39
40impl CompfsBuilder {
41 pub fn root(mut self, root: &str) -> Self {
43 self.config.root = if root.is_empty() {
44 None
45 } else {
46 Some(root.to_string())
47 };
48
49 self
50 }
51}
52
53impl Builder for CompfsBuilder {
54 type Config = CompfsConfig;
55
56 fn build(self) -> Result<impl Access> {
57 let root = match self.config.root {
58 Some(root) => Ok(root),
59 None => Err(Error::new(
60 ErrorKind::ConfigInvalid,
61 "root is not specified",
62 )),
63 }?;
64
65 if let Err(e) = std::fs::metadata(&root) {
67 if e.kind() == std::io::ErrorKind::NotFound {
68 std::fs::create_dir_all(&root).map_err(|e| {
69 Error::new(ErrorKind::Unexpected, "create root dir failed")
70 .with_operation("Builder::build")
71 .with_context("root", root.as_str())
72 .set_source(e)
73 })?;
74 }
75 }
76
77 let dispatcher = Dispatcher::new().map_err(|_| {
78 Error::new(
79 ErrorKind::Unexpected,
80 "failed to initiate compio dispatcher",
81 )
82 })?;
83 let core = CompfsCore {
84 info: {
85 let am = AccessorInfo::default();
86 am.set_scheme(COMPFS_SCHEME)
87 .set_root(&root)
88 .set_native_capability(Capability {
89 stat: true,
90
91 read: true,
92
93 write: true,
94 write_can_empty: true,
95 write_can_multi: true,
96 create_dir: true,
97 delete: true,
98
99 list: true,
100
101 copy: true,
102 rename: true,
103
104 shared: true,
105
106 ..Default::default()
107 });
108
109 am.into()
110 },
111 root: root.into(),
112 dispatcher,
113 buf_pool: oio::PooledBuf::new(16),
114 };
115 Ok(CompfsBackend {
116 core: Arc::new(core),
117 })
118 }
119}
120
121#[derive(Clone, Debug)]
122pub struct CompfsBackend {
123 core: Arc<CompfsCore>,
124}
125
126impl Access for CompfsBackend {
127 type Reader = CompfsReader;
128 type Writer = CompfsWriter;
129 type Lister = Option<CompfsLister>;
130 type Deleter = oio::OneShotDeleter<CompfsDeleter>;
131
132 fn info(&self) -> Arc<AccessorInfo> {
133 self.core.info.clone()
134 }
135
136 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
137 let path = self.core.prepare_path(path);
138
139 self.core
140 .exec(move || async move { compio::fs::create_dir_all(path).await })
141 .await?;
142
143 Ok(RpCreateDir::default())
144 }
145
146 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
147 let path = self.core.prepare_path(path);
148 let meta = self
149 .core
150 .exec(move || async move { compio::fs::metadata(path).await })
151 .await?;
152 let ty = meta.file_type();
153 let mode = if ty.is_dir() {
154 EntryMode::DIR
155 } else if ty.is_file() {
156 EntryMode::FILE
157 } else {
158 EntryMode::Unknown
159 };
160 let last_mod = Timestamp::try_from(meta.modified().map_err(new_std_io_error)?)?;
161 let ret = Metadata::new(mode)
162 .with_last_modified(last_mod)
163 .with_content_length(meta.len());
164 Ok(RpStat::new(ret))
165 }
166
167 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
168 Ok((
169 RpDelete::default(),
170 oio::OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
171 ))
172 }
173
174 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
175 let from = self.core.prepare_path(from);
176 let to = self.core.prepare_path(to);
177
178 self.core
179 .exec(move || async move {
180 let from = OpenOptions::new().read(true).open(from).await?;
181 if let Some(parent) = to.parent() {
182 compio::fs::create_dir_all(parent).await?;
183 }
184 let to = OpenOptions::new()
185 .write(true)
186 .create(true)
187 .truncate(true)
188 .open(to)
189 .await?;
190
191 let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
192 compio::io::copy(&mut from, &mut to).await?;
193
194 Ok(())
195 })
196 .await?;
197
198 Ok(RpCopy::default())
199 }
200
201 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
202 let from = self.core.prepare_path(from);
203 let to = self.core.prepare_path(to);
204
205 self.core
206 .exec(move || async move {
207 if let Some(parent) = to.parent() {
208 compio::fs::create_dir_all(parent).await?;
209 }
210 compio::fs::rename(from, to).await
211 })
212 .await?;
213
214 Ok(RpRename::default())
215 }
216
217 async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
218 let path = self.core.prepare_path(path);
219
220 let file = self
221 .core
222 .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
223 .await?;
224
225 let r = CompfsReader::new(self.core.clone(), file, op.range());
226 Ok((RpRead::new(), r))
227 }
228
229 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
230 let path = self.core.prepare_path(path);
231 let append = args.append();
232 let file = self
233 .core
234 .exec(move || async move {
235 if let Some(parent) = path.parent() {
236 compio::fs::create_dir_all(parent).await?;
237 }
238 let file = compio::fs::OpenOptions::new()
239 .create(true)
240 .write(true)
241 .truncate(!append)
242 .open(path)
243 .await?;
244 let mut file = Cursor::new(file);
245 if append {
246 let len = file.get_ref().metadata().await?.len();
247 file.set_position(len);
248 }
249 Ok(file)
250 })
251 .await?;
252
253 let w = CompfsWriter::new(self.core.clone(), file);
254 Ok((RpWrite::new(), w))
255 }
256
257 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
258 let path = self.core.prepare_path(path);
259
260 let read_dir = match self
261 .core
262 .exec_blocking({
263 let path = path.clone();
264 move || std::fs::read_dir(path)
265 })
266 .await?
267 {
268 Ok(rd) => rd,
269 Err(e) => {
270 return if e.kind() == std::io::ErrorKind::NotFound {
271 Ok((RpList::default(), None))
272 } else {
273 Err(new_std_io_error(e))
274 };
275 }
276 };
277
278 let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
279 Ok((RpList::default(), Some(lister)))
280 }
281}