opendal/services/compfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use crate::raw::oio::OneShotDeleter;
30use crate::raw::*;
31use crate::services::CompfsConfig;
32use crate::*;
33
34impl Configurator for CompfsConfig {
35    type Builder = CompfsBuilder;
36    fn into_builder(self) -> Self::Builder {
37        CompfsBuilder { config: self }
38    }
39}
40
41/// [`compio`]-based file system support.
42#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44    config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48    /// Set root for Compfs
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58}
59
60impl Builder for CompfsBuilder {
61    const SCHEME: Scheme = Scheme::Compfs;
62    type Config = CompfsConfig;
63
64    fn build(self) -> Result<impl Access> {
65        let root = match self.config.root {
66            Some(root) => Ok(root),
67            None => Err(Error::new(
68                ErrorKind::ConfigInvalid,
69                "root is not specified",
70            )),
71        }?;
72
73        // If root dir does not exist, we must create it.
74        if let Err(e) = std::fs::metadata(&root) {
75            if e.kind() == std::io::ErrorKind::NotFound {
76                std::fs::create_dir_all(&root).map_err(|e| {
77                    Error::new(ErrorKind::Unexpected, "create root dir failed")
78                        .with_operation("Builder::build")
79                        .with_context("root", root.as_str())
80                        .set_source(e)
81                })?;
82            }
83        }
84
85        let dispatcher = Dispatcher::new().map_err(|_| {
86            Error::new(
87                ErrorKind::Unexpected,
88                "failed to initiate compio dispatcher",
89            )
90        })?;
91        let core = CompfsCore {
92            info: {
93                let am = AccessorInfo::default();
94                am.set_scheme(Scheme::Compfs)
95                    .set_root(&root)
96                    .set_native_capability(Capability {
97                        stat: true,
98                        stat_has_last_modified: true,
99
100                        read: true,
101
102                        write: true,
103                        write_can_empty: true,
104                        write_can_multi: true,
105                        create_dir: true,
106                        delete: true,
107
108                        list: true,
109
110                        copy: true,
111                        rename: true,
112
113                        shared: true,
114
115                        ..Default::default()
116                    });
117
118                am.into()
119            },
120            root: root.into(),
121            dispatcher,
122            buf_pool: oio::PooledBuf::new(16),
123        };
124        Ok(CompfsBackend {
125            core: Arc::new(core),
126        })
127    }
128}
129
130#[derive(Debug)]
131pub struct CompfsBackend {
132    core: Arc<CompfsCore>,
133}
134
135impl Access for CompfsBackend {
136    type Reader = CompfsReader;
137    type Writer = CompfsWriter;
138    type Lister = Option<CompfsLister>;
139    type Deleter = OneShotDeleter<CompfsDeleter>;
140
141    fn info(&self) -> Arc<AccessorInfo> {
142        self.core.info.clone()
143    }
144
145    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
146        let path = self.core.prepare_path(path);
147
148        self.core
149            .exec(move || async move { compio::fs::create_dir_all(path).await })
150            .await?;
151
152        Ok(RpCreateDir::default())
153    }
154
155    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
156        let path = self.core.prepare_path(path);
157
158        let meta = self
159            .core
160            .exec(move || async move { compio::fs::metadata(path).await })
161            .await?;
162        let ty = meta.file_type();
163        let mode = if ty.is_dir() {
164            EntryMode::DIR
165        } else if ty.is_file() {
166            EntryMode::FILE
167        } else {
168            EntryMode::Unknown
169        };
170        let last_mod = meta.modified().map_err(new_std_io_error)?.into();
171        let ret = Metadata::new(mode)
172            .with_last_modified(last_mod)
173            .with_content_length(meta.len());
174
175        Ok(RpStat::new(ret))
176    }
177
178    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
179        Ok((
180            RpDelete::default(),
181            OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
182        ))
183    }
184
185    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
186        let from = self.core.prepare_path(from);
187        let to = self.core.prepare_path(to);
188
189        self.core
190            .exec(move || async move {
191                let from = OpenOptions::new().read(true).open(from).await?;
192                if let Some(parent) = to.parent() {
193                    compio::fs::create_dir_all(parent).await?;
194                }
195                let to = OpenOptions::new()
196                    .write(true)
197                    .create(true)
198                    .truncate(true)
199                    .open(to)
200                    .await?;
201
202                let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
203                compio::io::copy(&mut from, &mut to).await?;
204
205                Ok(())
206            })
207            .await?;
208
209        Ok(RpCopy::default())
210    }
211
212    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
213        let from = self.core.prepare_path(from);
214        let to = self.core.prepare_path(to);
215
216        self.core
217            .exec(move || async move {
218                if let Some(parent) = to.parent() {
219                    compio::fs::create_dir_all(parent).await?;
220                }
221                compio::fs::rename(from, to).await
222            })
223            .await?;
224
225        Ok(RpRename::default())
226    }
227
228    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
229        let path = self.core.prepare_path(path);
230
231        let file = self
232            .core
233            .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
234            .await?;
235
236        let r = CompfsReader::new(self.core.clone(), file, op.range());
237        Ok((RpRead::new(), r))
238    }
239
240    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
241        let path = self.core.prepare_path(path);
242        let append = args.append();
243        let file = self
244            .core
245            .exec(move || async move {
246                if let Some(parent) = path.parent() {
247                    compio::fs::create_dir_all(parent).await?;
248                }
249                let file = compio::fs::OpenOptions::new()
250                    .create(true)
251                    .write(true)
252                    .truncate(!append)
253                    .open(path)
254                    .await?;
255                let mut file = Cursor::new(file);
256                if append {
257                    let len = file.get_ref().metadata().await?.len();
258                    file.set_position(len);
259                }
260                Ok(file)
261            })
262            .await?;
263
264        let w = CompfsWriter::new(self.core.clone(), file);
265        Ok((RpWrite::new(), w))
266    }
267
268    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
269        let path = self.core.prepare_path(path);
270
271        let read_dir = match self
272            .core
273            .exec_blocking({
274                let path = path.clone();
275                move || std::fs::read_dir(path)
276            })
277            .await?
278        {
279            Ok(rd) => rd,
280            Err(e) => {
281                return if e.kind() == std::io::ErrorKind::NotFound {
282                    Ok((RpList::default(), None))
283                } else {
284                    Err(new_std_io_error(e))
285                };
286            }
287        };
288
289        let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
290        Ok((RpList::default(), Some(lister)))
291    }
292}