opendal/services/monoiofs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::io;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use monoio::fs::OpenOptions;
25
26use super::core::MonoiofsCore;
27use super::core::BUFFER_SIZE;
28use super::delete::MonoiofsDeleter;
29use super::reader::MonoiofsReader;
30use super::writer::MonoiofsWriter;
31use crate::raw::*;
32use crate::services::MonoiofsConfig;
33use crate::*;
34
35impl Configurator for MonoiofsConfig {
36    type Builder = MonoiofsBuilder;
37    fn into_builder(self) -> Self::Builder {
38        MonoiofsBuilder { config: self }
39    }
40}
41
42/// File system support via [`monoio`].
43#[doc = include_str!("docs.md")]
44#[derive(Default, Debug)]
45pub struct MonoiofsBuilder {
46    config: MonoiofsConfig,
47}
48
49impl MonoiofsBuilder {
50    /// Set root of this backend.
51    ///
52    /// All operations will happen under this root.
53    pub fn root(mut self, root: &str) -> Self {
54        self.config.root = if root.is_empty() {
55            None
56        } else {
57            Some(root.to_string())
58        };
59        self
60    }
61}
62
63impl Builder for MonoiofsBuilder {
64    const SCHEME: Scheme = Scheme::Monoiofs;
65    type Config = MonoiofsConfig;
66
67    fn build(self) -> Result<impl Access> {
68        let root = self.config.root.map(PathBuf::from).ok_or(
69            Error::new(ErrorKind::ConfigInvalid, "root is not specified")
70                .with_operation("Builder::build"),
71        )?;
72        if let Err(e) = std::fs::metadata(&root) {
73            if e.kind() == io::ErrorKind::NotFound {
74                std::fs::create_dir_all(&root).map_err(|e| {
75                    Error::new(ErrorKind::Unexpected, "create root dir failed")
76                        .with_operation("Builder::build")
77                        .with_context("root", root.to_string_lossy())
78                        .set_source(e)
79                })?;
80            }
81        }
82        let root = root.canonicalize().map_err(|e| {
83            Error::new(
84                ErrorKind::Unexpected,
85                "canonicalize of root directory failed",
86            )
87            .with_operation("Builder::build")
88            .with_context("root", root.to_string_lossy())
89            .set_source(e)
90        })?;
91        let worker_threads = 1; // TODO: test concurrency and default to available_parallelism and bind cpu
92        let io_uring_entries = 1024;
93        Ok(MonoiofsBackend {
94            core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
95        })
96    }
97}
98
99#[derive(Debug, Clone)]
100pub struct MonoiofsBackend {
101    core: Arc<MonoiofsCore>,
102}
103
104impl Access for MonoiofsBackend {
105    type Reader = MonoiofsReader;
106    type Writer = MonoiofsWriter;
107    type Lister = ();
108    type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
109    type BlockingReader = ();
110    type BlockingWriter = ();
111    type BlockingLister = ();
112    type BlockingDeleter = ();
113
114    fn info(&self) -> Arc<AccessorInfo> {
115        self.core.info.clone()
116    }
117
118    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
119        let path = self.core.prepare_path(path);
120        let meta = self
121            .core
122            .dispatch(move || monoio::fs::metadata(path))
123            .await
124            .map_err(new_std_io_error)?;
125        let mode = if meta.is_dir() {
126            EntryMode::DIR
127        } else if meta.is_file() {
128            EntryMode::FILE
129        } else {
130            EntryMode::Unknown
131        };
132        let m = Metadata::new(mode)
133            .with_content_length(meta.len())
134            .with_last_modified(
135                meta.modified()
136                    .map(DateTime::from)
137                    .map_err(new_std_io_error)?,
138            );
139        Ok(RpStat::new(m))
140    }
141
142    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
143        let path = self.core.prepare_path(path);
144        let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
145        Ok((RpRead::default(), reader))
146    }
147
148    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
149        let path = self.core.prepare_write_path(path).await?;
150        let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
151        Ok((RpWrite::default(), writer))
152    }
153
154    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
155        Ok((
156            RpDelete::default(),
157            oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
158        ))
159    }
160
161    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
162        let from = self.core.prepare_path(from);
163        // ensure file exists
164        self.core
165            .dispatch({
166                let from = from.clone();
167                move || monoio::fs::metadata(from)
168            })
169            .await
170            .map_err(new_std_io_error)?;
171        let to = self.core.prepare_write_path(to).await?;
172        self.core
173            .dispatch(move || monoio::fs::rename(from, to))
174            .await
175            .map_err(new_std_io_error)?;
176        Ok(RpRename::default())
177    }
178
179    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
180        let path = self.core.prepare_path(path);
181        self.core
182            .dispatch(move || monoio::fs::create_dir_all(path))
183            .await
184            .map_err(new_std_io_error)?;
185        Ok(RpCreateDir::default())
186    }
187
188    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
189        let from = self.core.prepare_path(from);
190        // ensure file exists
191        self.core
192            .dispatch({
193                let from = from.clone();
194                move || monoio::fs::metadata(from)
195            })
196            .await
197            .map_err(new_std_io_error)?;
198        let to = self.core.prepare_write_path(to).await?;
199        self.core
200            .dispatch({
201                let core = self.core.clone();
202                move || async move {
203                    let from = OpenOptions::new().read(true).open(from).await?;
204                    let to = OpenOptions::new()
205                        .write(true)
206                        .create(true)
207                        .truncate(true)
208                        .open(to)
209                        .await?;
210
211                    // AsyncReadRent and AsyncWriteRent is not implemented
212                    // for File, so we can't write this:
213                    // monoio::io::copy(&mut from, &mut to).await?;
214
215                    let mut pos = 0;
216                    // allocate and resize buffer
217                    let mut buf = core.buf_pool.get();
218                    // set capacity of buf to exact size to avoid excessive read
219                    buf.reserve(BUFFER_SIZE);
220                    let _ = buf.split_off(BUFFER_SIZE);
221
222                    loop {
223                        let result;
224                        (result, buf) = from.read_at(buf, pos).await;
225                        if result? == 0 {
226                            // EOF
227                            break;
228                        }
229                        let result;
230                        (result, buf) = to.write_all_at(buf, pos).await;
231                        result?;
232                        pos += buf.len() as u64;
233                        buf.clear();
234                    }
235                    core.buf_pool.put(buf);
236                    Ok(())
237                }
238            })
239            .await
240            .map_err(new_std_io_error)?;
241        Ok(RpCopy::default())
242    }
243}