opendal/services/compfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use crate::raw::oio::OneShotDeleter;
30use crate::raw::*;
31use crate::services::CompfsConfig;
32use crate::*;
33
34impl Configurator for CompfsConfig {
35    type Builder = CompfsBuilder;
36    fn into_builder(self) -> Self::Builder {
37        CompfsBuilder { config: self }
38    }
39}
40
41/// [`compio`]-based file system support.
42#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44    config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48    /// Set root for Compfs
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58}
59
60impl Builder for CompfsBuilder {
61    const SCHEME: Scheme = Scheme::Compfs;
62    type Config = CompfsConfig;
63
64    fn build(self) -> Result<impl Access> {
65        let root = match self.config.root {
66            Some(root) => Ok(root),
67            None => Err(Error::new(
68                ErrorKind::ConfigInvalid,
69                "root is not specified",
70            )),
71        }?;
72
73        // If root dir does not exist, we must create it.
74        if let Err(e) = std::fs::metadata(&root) {
75            if e.kind() == std::io::ErrorKind::NotFound {
76                std::fs::create_dir_all(&root).map_err(|e| {
77                    Error::new(ErrorKind::Unexpected, "create root dir failed")
78                        .with_operation("Builder::build")
79                        .with_context("root", root.as_str())
80                        .set_source(e)
81                })?;
82            }
83        }
84
85        let dispatcher = Dispatcher::new().map_err(|_| {
86            Error::new(
87                ErrorKind::Unexpected,
88                "failed to initiate compio dispatcher",
89            )
90        })?;
91        let core = CompfsCore {
92            info: {
93                let am = AccessorInfo::default();
94                am.set_scheme(Scheme::Compfs)
95                    .set_root(&root)
96                    .set_native_capability(Capability {
97                        stat: true,
98                        stat_has_last_modified: true,
99
100                        read: true,
101
102                        write: true,
103                        write_can_empty: true,
104                        write_can_multi: true,
105                        create_dir: true,
106                        delete: true,
107
108                        list: true,
109
110                        copy: true,
111                        rename: true,
112
113                        shared: true,
114
115                        ..Default::default()
116                    });
117
118                am.into()
119            },
120            root: root.into(),
121            dispatcher,
122            buf_pool: oio::PooledBuf::new(16),
123        };
124        Ok(CompfsBackend {
125            core: Arc::new(core),
126        })
127    }
128}
129
130#[derive(Debug)]
131pub struct CompfsBackend {
132    core: Arc<CompfsCore>,
133}
134
135impl Access for CompfsBackend {
136    type Reader = CompfsReader;
137    type Writer = CompfsWriter;
138    type Lister = Option<CompfsLister>;
139    type Deleter = OneShotDeleter<CompfsDeleter>;
140    type BlockingReader = ();
141    type BlockingWriter = ();
142    type BlockingLister = ();
143    type BlockingDeleter = ();
144
145    fn info(&self) -> Arc<AccessorInfo> {
146        self.core.info.clone()
147    }
148
149    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
150        let path = self.core.prepare_path(path);
151
152        self.core
153            .exec(move || async move { compio::fs::create_dir_all(path).await })
154            .await?;
155
156        Ok(RpCreateDir::default())
157    }
158
159    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
160        let path = self.core.prepare_path(path);
161
162        let meta = self
163            .core
164            .exec(move || async move { compio::fs::metadata(path).await })
165            .await?;
166        let ty = meta.file_type();
167        let mode = if ty.is_dir() {
168            EntryMode::DIR
169        } else if ty.is_file() {
170            EntryMode::FILE
171        } else {
172            EntryMode::Unknown
173        };
174        let last_mod = meta.modified().map_err(new_std_io_error)?.into();
175        let ret = Metadata::new(mode)
176            .with_last_modified(last_mod)
177            .with_content_length(meta.len());
178
179        Ok(RpStat::new(ret))
180    }
181
182    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
183        Ok((
184            RpDelete::default(),
185            OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
186        ))
187    }
188
189    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
190        let from = self.core.prepare_path(from);
191        let to = self.core.prepare_path(to);
192
193        self.core
194            .exec(move || async move {
195                let from = OpenOptions::new().read(true).open(from).await?;
196                if let Some(parent) = to.parent() {
197                    compio::fs::create_dir_all(parent).await?;
198                }
199                let to = OpenOptions::new()
200                    .write(true)
201                    .create(true)
202                    .truncate(true)
203                    .open(to)
204                    .await?;
205
206                let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
207                compio::io::copy(&mut from, &mut to).await?;
208
209                Ok(())
210            })
211            .await?;
212
213        Ok(RpCopy::default())
214    }
215
216    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
217        let from = self.core.prepare_path(from);
218        let to = self.core.prepare_path(to);
219
220        self.core
221            .exec(move || async move {
222                if let Some(parent) = to.parent() {
223                    compio::fs::create_dir_all(parent).await?;
224                }
225                compio::fs::rename(from, to).await
226            })
227            .await?;
228
229        Ok(RpRename::default())
230    }
231
232    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
233        let path = self.core.prepare_path(path);
234
235        let file = self
236            .core
237            .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
238            .await?;
239
240        let r = CompfsReader::new(self.core.clone(), file, op.range());
241        Ok((RpRead::new(), r))
242    }
243
244    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
245        let path = self.core.prepare_path(path);
246        let append = args.append();
247        let file = self
248            .core
249            .exec(move || async move {
250                if let Some(parent) = path.parent() {
251                    compio::fs::create_dir_all(parent).await?;
252                }
253                let file = compio::fs::OpenOptions::new()
254                    .create(true)
255                    .write(true)
256                    .truncate(!append)
257                    .open(path)
258                    .await?;
259                let mut file = Cursor::new(file);
260                if append {
261                    let len = file.get_ref().metadata().await?.len();
262                    file.set_position(len);
263                }
264                Ok(file)
265            })
266            .await?;
267
268        let w = CompfsWriter::new(self.core.clone(), file);
269        Ok((RpWrite::new(), w))
270    }
271
272    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
273        let path = self.core.prepare_path(path);
274
275        let read_dir = match self
276            .core
277            .exec_blocking({
278                let path = path.clone();
279                move || std::fs::read_dir(path)
280            })
281            .await?
282        {
283            Ok(rd) => rd,
284            Err(e) => {
285                return if e.kind() == std::io::ErrorKind::NotFound {
286                    Ok((RpList::default(), None))
287                } else {
288                    Err(new_std_io_error(e))
289                };
290            }
291        };
292
293        let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
294        Ok((RpList::default(), Some(lister)))
295    }
296}