opendal/services/compfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::dispatcher::Dispatcher;
22use compio::fs::OpenOptions;
23
24use super::core::CompfsCore;
25use super::delete::CompfsDeleter;
26use super::lister::CompfsLister;
27use super::reader::CompfsReader;
28use super::writer::CompfsWriter;
29use super::DEFAULT_SCHEME;
30use crate::raw::oio::OneShotDeleter;
31use crate::raw::*;
32use crate::services::CompfsConfig;
33use crate::*;
34impl Configurator for CompfsConfig {
35    type Builder = CompfsBuilder;
36    fn into_builder(self) -> Self::Builder {
37        CompfsBuilder { config: self }
38    }
39}
40
41/// [`compio`]-based file system support.
42#[derive(Debug, Clone, Default)]
43pub struct CompfsBuilder {
44    config: CompfsConfig,
45}
46
47impl CompfsBuilder {
48    /// Set root for Compfs
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58}
59
60impl Builder for CompfsBuilder {
61    type Config = CompfsConfig;
62
63    fn build(self) -> Result<impl Access> {
64        let root = match self.config.root {
65            Some(root) => Ok(root),
66            None => Err(Error::new(
67                ErrorKind::ConfigInvalid,
68                "root is not specified",
69            )),
70        }?;
71
72        // If root dir does not exist, we must create it.
73        if let Err(e) = std::fs::metadata(&root) {
74            if e.kind() == std::io::ErrorKind::NotFound {
75                std::fs::create_dir_all(&root).map_err(|e| {
76                    Error::new(ErrorKind::Unexpected, "create root dir failed")
77                        .with_operation("Builder::build")
78                        .with_context("root", root.as_str())
79                        .set_source(e)
80                })?;
81            }
82        }
83
84        let dispatcher = Dispatcher::new().map_err(|_| {
85            Error::new(
86                ErrorKind::Unexpected,
87                "failed to initiate compio dispatcher",
88            )
89        })?;
90        let core = CompfsCore {
91            info: {
92                let am = AccessorInfo::default();
93                am.set_scheme(DEFAULT_SCHEME)
94                    .set_root(&root)
95                    .set_native_capability(Capability {
96                        stat: true,
97
98                        read: true,
99
100                        write: true,
101                        write_can_empty: true,
102                        write_can_multi: true,
103                        create_dir: true,
104                        delete: true,
105
106                        list: true,
107
108                        copy: true,
109                        rename: true,
110
111                        shared: true,
112
113                        ..Default::default()
114                    });
115
116                am.into()
117            },
118            root: root.into(),
119            dispatcher,
120            buf_pool: oio::PooledBuf::new(16),
121        };
122        Ok(CompfsBackend {
123            core: Arc::new(core),
124        })
125    }
126}
127
128#[derive(Debug)]
129pub struct CompfsBackend {
130    core: Arc<CompfsCore>,
131}
132
133impl Access for CompfsBackend {
134    type Reader = CompfsReader;
135    type Writer = CompfsWriter;
136    type Lister = Option<CompfsLister>;
137    type Deleter = OneShotDeleter<CompfsDeleter>;
138
139    fn info(&self) -> Arc<AccessorInfo> {
140        self.core.info.clone()
141    }
142
143    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
144        let path = self.core.prepare_path(path);
145
146        self.core
147            .exec(move || async move { compio::fs::create_dir_all(path).await })
148            .await?;
149
150        Ok(RpCreateDir::default())
151    }
152
153    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
154        let path = self.core.prepare_path(path);
155
156        let meta = self
157            .core
158            .exec(move || async move { compio::fs::metadata(path).await })
159            .await?;
160        let ty = meta.file_type();
161        let mode = if ty.is_dir() {
162            EntryMode::DIR
163        } else if ty.is_file() {
164            EntryMode::FILE
165        } else {
166            EntryMode::Unknown
167        };
168        let last_mod = meta.modified().map_err(new_std_io_error)?.into();
169        let ret = Metadata::new(mode)
170            .with_last_modified(last_mod)
171            .with_content_length(meta.len());
172
173        Ok(RpStat::new(ret))
174    }
175
176    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
177        Ok((
178            RpDelete::default(),
179            OneShotDeleter::new(CompfsDeleter::new(self.core.clone())),
180        ))
181    }
182
183    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
184        let from = self.core.prepare_path(from);
185        let to = self.core.prepare_path(to);
186
187        self.core
188            .exec(move || async move {
189                let from = OpenOptions::new().read(true).open(from).await?;
190                if let Some(parent) = to.parent() {
191                    compio::fs::create_dir_all(parent).await?;
192                }
193                let to = OpenOptions::new()
194                    .write(true)
195                    .create(true)
196                    .truncate(true)
197                    .open(to)
198                    .await?;
199
200                let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
201                compio::io::copy(&mut from, &mut to).await?;
202
203                Ok(())
204            })
205            .await?;
206
207        Ok(RpCopy::default())
208    }
209
210    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
211        let from = self.core.prepare_path(from);
212        let to = self.core.prepare_path(to);
213
214        self.core
215            .exec(move || async move {
216                if let Some(parent) = to.parent() {
217                    compio::fs::create_dir_all(parent).await?;
218                }
219                compio::fs::rename(from, to).await
220            })
221            .await?;
222
223        Ok(RpRename::default())
224    }
225
226    async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead, Self::Reader)> {
227        let path = self.core.prepare_path(path);
228
229        let file = self
230            .core
231            .exec(|| async move { compio::fs::OpenOptions::new().read(true).open(&path).await })
232            .await?;
233
234        let r = CompfsReader::new(self.core.clone(), file, op.range());
235        Ok((RpRead::new(), r))
236    }
237
238    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
239        let path = self.core.prepare_path(path);
240        let append = args.append();
241        let file = self
242            .core
243            .exec(move || async move {
244                if let Some(parent) = path.parent() {
245                    compio::fs::create_dir_all(parent).await?;
246                }
247                let file = compio::fs::OpenOptions::new()
248                    .create(true)
249                    .write(true)
250                    .truncate(!append)
251                    .open(path)
252                    .await?;
253                let mut file = Cursor::new(file);
254                if append {
255                    let len = file.get_ref().metadata().await?.len();
256                    file.set_position(len);
257                }
258                Ok(file)
259            })
260            .await?;
261
262        let w = CompfsWriter::new(self.core.clone(), file);
263        Ok((RpWrite::new(), w))
264    }
265
266    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
267        let path = self.core.prepare_path(path);
268
269        let read_dir = match self
270            .core
271            .exec_blocking({
272                let path = path.clone();
273                move || std::fs::read_dir(path)
274            })
275            .await?
276        {
277            Ok(rd) => rd,
278            Err(e) => {
279                return if e.kind() == std::io::ErrorKind::NotFound {
280                    Ok((RpList::default(), None))
281                } else {
282                    Err(new_std_io_error(e))
283                };
284            }
285        };
286
287        let lister = CompfsLister::new(self.core.clone(), &path, read_dir);
288        Ok((RpList::default(), Some(lister)))
289    }
290}