opendal/services/monoiofs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::io;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use monoio::fs::OpenOptions;
24
25use super::config::MonoiofsConfig;
26use super::core::BUFFER_SIZE;
27use super::core::MonoiofsCore;
28use super::deleter::MonoiofsDeleter;
29use super::reader::MonoiofsReader;
30use super::writer::MonoiofsWriter;
31use crate::raw::*;
32use crate::*;
33
34/// File system support via [`monoio`].
35#[doc = include_str!("docs.md")]
36#[derive(Debug, Default)]
37pub struct MonoiofsBuilder {
38    pub(super) config: MonoiofsConfig,
39}
40
41impl MonoiofsBuilder {
42    /// Set root of this backend.
43    ///
44    /// All operations will happen under this root.
45    pub fn root(mut self, root: &str) -> Self {
46        self.config.root = if root.is_empty() {
47            None
48        } else {
49            Some(root.to_string())
50        };
51        self
52    }
53}
54
55impl Builder for MonoiofsBuilder {
56    type Config = MonoiofsConfig;
57
58    fn build(self) -> Result<impl Access> {
59        let root = self.config.root.map(PathBuf::from).ok_or(
60            Error::new(ErrorKind::ConfigInvalid, "root is not specified")
61                .with_operation("Builder::build"),
62        )?;
63        if let Err(e) = std::fs::metadata(&root) {
64            if e.kind() == io::ErrorKind::NotFound {
65                std::fs::create_dir_all(&root).map_err(|e| {
66                    Error::new(ErrorKind::Unexpected, "create root dir failed")
67                        .with_operation("Builder::build")
68                        .with_context("root", root.to_string_lossy())
69                        .set_source(e)
70                })?;
71            }
72        }
73        let root = root.canonicalize().map_err(|e| {
74            Error::new(
75                ErrorKind::Unexpected,
76                "canonicalize of root directory failed",
77            )
78            .with_operation("Builder::build")
79            .with_context("root", root.to_string_lossy())
80            .set_source(e)
81        })?;
82        let worker_threads = 1; // TODO: test concurrency and default to available_parallelism and bind cpu
83        let io_uring_entries = 1024;
84        Ok(MonoiofsBackend {
85            core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
86        })
87    }
88}
89
90#[derive(Debug, Clone)]
91pub struct MonoiofsBackend {
92    core: Arc<MonoiofsCore>,
93}
94
95impl Access for MonoiofsBackend {
96    type Reader = MonoiofsReader;
97    type Writer = MonoiofsWriter;
98    type Lister = ();
99    type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
100
101    fn info(&self) -> Arc<AccessorInfo> {
102        self.core.info.clone()
103    }
104
105    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
106        let path = self.core.prepare_path(path);
107        let meta = self
108            .core
109            .dispatch(move || monoio::fs::metadata(path))
110            .await
111            .map_err(new_std_io_error)?;
112        let mode = if meta.is_dir() {
113            EntryMode::DIR
114        } else if meta.is_file() {
115            EntryMode::FILE
116        } else {
117            EntryMode::Unknown
118        };
119        let m = Metadata::new(mode)
120            .with_content_length(meta.len())
121            .with_last_modified(Timestamp::try_from(
122                meta.modified().map_err(new_std_io_error)?,
123            )?);
124        Ok(RpStat::new(m))
125    }
126
127    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
128        let path = self.core.prepare_path(path);
129        let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
130        Ok((RpRead::default(), reader))
131    }
132
133    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
134        let path = self.core.prepare_write_path(path).await?;
135        let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
136        Ok((RpWrite::default(), writer))
137    }
138
139    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
140        Ok((
141            RpDelete::default(),
142            oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
143        ))
144    }
145
146    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
147        let from = self.core.prepare_path(from);
148        // ensure file exists
149        self.core
150            .dispatch({
151                let from = from.clone();
152                move || monoio::fs::metadata(from)
153            })
154            .await
155            .map_err(new_std_io_error)?;
156        let to = self.core.prepare_write_path(to).await?;
157        self.core
158            .dispatch(move || monoio::fs::rename(from, to))
159            .await
160            .map_err(new_std_io_error)?;
161        Ok(RpRename::default())
162    }
163
164    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
165        let path = self.core.prepare_path(path);
166        self.core
167            .dispatch(move || monoio::fs::create_dir_all(path))
168            .await
169            .map_err(new_std_io_error)?;
170        Ok(RpCreateDir::default())
171    }
172
173    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
174        let from = self.core.prepare_path(from);
175        // ensure file exists
176        self.core
177            .dispatch({
178                let from = from.clone();
179                move || monoio::fs::metadata(from)
180            })
181            .await
182            .map_err(new_std_io_error)?;
183        let to = self.core.prepare_write_path(to).await?;
184        self.core
185            .dispatch({
186                let core = self.core.clone();
187                move || async move {
188                    let from = OpenOptions::new().read(true).open(from).await?;
189                    let to = OpenOptions::new()
190                        .write(true)
191                        .create(true)
192                        .truncate(true)
193                        .open(to)
194                        .await?;
195
196                    // AsyncReadRent and AsyncWriteRent is not implemented
197                    // for File, so we can't write this:
198                    // monoio::io::copy(&mut from, &mut to).await?;
199
200                    let mut pos = 0;
201                    // allocate and resize buffer
202                    let mut buf = core.buf_pool.get();
203                    // set capacity of buf to exact size to avoid excessive read
204                    buf.reserve(BUFFER_SIZE);
205                    let _ = buf.split_off(BUFFER_SIZE);
206
207                    loop {
208                        let result;
209                        (result, buf) = from.read_at(buf, pos).await;
210                        if result? == 0 {
211                            // EOF
212                            break;
213                        }
214                        let result;
215                        (result, buf) = to.write_all_at(buf, pos).await;
216                        result?;
217                        pos += buf.len() as u64;
218                        buf.clear();
219                    }
220                    core.buf_pool.put(buf);
221                    Ok(())
222                }
223            })
224            .await
225            .map_err(new_std_io_error)?;
226        Ok(RpCopy::default())
227    }
228}