opendal/services/compfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::COMPFS_SCHEME;
25use super::config::CompfsConfig;
26use super::core::CompfsCore;
27use super::deleter::CompfsDeleter;
28use super::lister::CompfsLister;
29use super::reader::CompfsReader;
30use super::writer::CompfsWriter;
31use crate::raw::*;
32use crate::*;
33
34/// [`compio`]-based file system support.
35#[derive(Debug, Default)]
36pub struct CompfsBuilder {
37    pub(super) config: CompfsConfig,
38}
39
40impl CompfsBuilder {
41    /// Set root for Compfs
42    pub fn root(mut self, root: &str) -> Self {
43        self.config.root = if root.is_empty() {
44            None
45        } else {
46            Some(root.to_string())
47        };
48
49        self
50    }
51}
52
53impl Builder for CompfsBuilder {
54    type Config = CompfsConfig;
55
56    fn build(self) -> Result<impl Access> {
57        let root = match self.config.root {
58            Some(root) => Ok(root),
59            None => Err(Error::new(
60                ErrorKind::ConfigInvalid,
61                "root is not specified",
62            )),
63        }?;
64
65        // If root dir does not exist, we must create it.
66        if let Err(e) = std::fs::metadata(&root) {
67            if e.kind() == std::io::ErrorKind::NotFound {
68                std::fs::create_dir_all(&root).map_err(|e| {
69                    Error::new(ErrorKind::Unexpected, "create root dir failed")
70                        .with_operation("Builder::build")
71                        .with_context("root", root.as_str())
72                        .set_source(e)
73                })?;
74            }
75        }
76
77        let dispatcher = Dispatcher::new().map_err(|_| {
78            Error::new(
79                ErrorKind::Unexpected,
80                "failed to initiate compio dispatcher",
81            )
82        })?;
83        let core = CompfsCore {
84            info: {
85                let am = AccessorInfo::default();
86                am.set_scheme(COMPFS_SCHEME)
87                    .set_root(&root)
88                    .set_native_capability(Capability {
89                        stat: true,
90
91                        read: true,
92
93                        write: true,
94                        write_can_empty: true,
95                        write_can_multi: true,
96                        create_dir: true,
97                        delete: true,
98
99                        list: true,
100
101                        copy: true,
102                        rename: true,
103
104                        shared: true,
105
106                        ..Default::default()
107                    });
108
109                am.into()
110            },
111            root: root.into(),
112            dispatcher,
113            buf_pool: oio::PooledBuf::new(16),
114        };
115        Ok(CompfsBackend {
116            core: Arc::new(core),
117        })
118    }
119}
120
121#[derive(Clone, Debug)]
122pub struct CompfsBackend {
123    core: Arc<CompfsCore>,
124}
125
126impl Access for CompfsBackend {
127    type Reader = CompfsReader;
128    type Writer = CompfsWriter;
129    type Lister = Option<CompfsLister>;
130    type Deleter = oio::OneShotDeleter<CompfsDeleter>;
131
132    fn info(&self) -> Arc<AccessorInfo> {
133        self.core.info.clone()
134    }
135
136    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
137        let path = self.core.prepare_path(path);
138
139        self.core
140            .exec(move || async move { compio::fs::create_dir_all(path).await })
141            .await?;
142
143        Ok(RpCreateDir::default())
144    }
145
146    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
147        let path = self.core.prepare_path(path);
148        let meta = self
149            .core
150            .exec(move || async move { compio::fs::metadata(path).await })
151            .await?;
152        let ty = meta.file_type();
153        let mode = if ty.is_dir() {
154            EntryMode::DIR
155        } else if ty.is_file() {
156            EntryMode::FILE
157        } else {
158            EntryMode::Unknown
159        };
160        let last_mod = Timestamp::try_from(meta.modified().map_err(new_std_io_error)?)?;
161        let ret = Metadata::new(mode)
162            .with_last_modified(last_mod)
163            .with_content_length(meta.len());
164        Ok(RpStat::new(ret))
165    }
166
167    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
168        Ok((
169            RpDelete::default(),
170            oio::OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
171        ))
172    }
173
174    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
175        let from = self.core.prepare_path(from);
176        let to = self.core.prepare_path(to);
177
178        self.core
179            .exec(move || async move {
180                let from = OpenOptions::new().read(true).open(from).await?;
181                if let Some(parent) = to.parent() {
182                    compio::fs::create_dir_all(parent).await?;
183                }
184                let to = OpenOptions::new()
185                    .write(true)
186                    .create(true)
187                    .truncate(true)
188                    .open(to)
189                    .await?;
190
191                let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
192                compio::io::copy(&mut from, &mut to).await?;
193
194                Ok(())
195            })
196            .await?;
197
198        Ok(RpCopy::default())
199    }
200
201    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
202        let from = self.core.prepare_path(from);
203        let to = self.core.prepare_path(to);
204
205        self.core
206            .exec(move || async move {
207                if let Some(parent) = to.parent() {
208                    compio::fs::create_dir_all(parent).await?;
209                }
210                compio::fs::rename(from, to).await
211            })
212            .await?;
213
214        Ok(RpRename::default())
215    }
216
217    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
218        let path = self.core.prepare_path(path);
219
220        let file = self
221            .core
222            .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
223            .await?;
224
225        let r = CompfsReader::new(self.core.clone(), file, op.range());
226        Ok((RpRead::new(), r))
227    }
228
229    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
230        let path = self.core.prepare_path(path);
231        let append = args.append();
232        let file = self
233            .core
234            .exec(move || async move {
235                if let Some(parent) = path.parent() {
236                    compio::fs::create_dir_all(parent).await?;
237                }
238                let file = compio::fs::OpenOptions::new()
239                    .create(true)
240                    .write(true)
241                    .truncate(!append)
242                    .open(path)
243                    .await?;
244                let mut file = Cursor::new(file);
245                if append {
246                    let len = file.get_ref().metadata().await?.len();
247                    file.set_position(len);
248                }
249                Ok(file)
250            })
251            .await?;
252
253        let w = CompfsWriter::new(self.core.clone(), file);
254        Ok((RpWrite::new(), w))
255    }
256
257    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
258        let path = self.core.prepare_path(path);
259
260        let read_dir = match self
261            .core
262            .exec_blocking({
263                let path = path.clone();
264                move || std::fs::read_dir(path)
265            })
266            .await?
267        {
268            Ok(rd) => rd,
269            Err(e) => {
270                return if e.kind() == std::io::ErrorKind::NotFound {
271                    Ok((RpList::default(), None))
272                } else {
273                    Err(new_std_io_error(e))
274                };
275            }
276        };
277
278        let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
279        Ok((RpList::default(), Some(lister)))
280    }
281}