opendal/services/monoiofs/
backend.rs1use std::fmt::Debug;
19use std::io;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use monoio::fs::OpenOptions;
25
26use super::core::MonoiofsCore;
27use super::core::BUFFER_SIZE;
28use super::delete::MonoiofsDeleter;
29use super::reader::MonoiofsReader;
30use super::writer::MonoiofsWriter;
31use crate::raw::*;
32use crate::services::MonoiofsConfig;
33use crate::*;
34
35impl Configurator for MonoiofsConfig {
36 type Builder = MonoiofsBuilder;
37 fn into_builder(self) -> Self::Builder {
38 MonoiofsBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
44#[derive(Default, Debug)]
45pub struct MonoiofsBuilder {
46 config: MonoiofsConfig,
47}
48
49impl MonoiofsBuilder {
50 pub fn root(mut self, root: &str) -> Self {
54 self.config.root = if root.is_empty() {
55 None
56 } else {
57 Some(root.to_string())
58 };
59 self
60 }
61}
62
63impl Builder for MonoiofsBuilder {
64 const SCHEME: Scheme = Scheme::Monoiofs;
65 type Config = MonoiofsConfig;
66
67 fn build(self) -> Result<impl Access> {
68 let root = self.config.root.map(PathBuf::from).ok_or(
69 Error::new(ErrorKind::ConfigInvalid, "root is not specified")
70 .with_operation("Builder::build"),
71 )?;
72 if let Err(e) = std::fs::metadata(&root) {
73 if e.kind() == io::ErrorKind::NotFound {
74 std::fs::create_dir_all(&root).map_err(|e| {
75 Error::new(ErrorKind::Unexpected, "create root dir failed")
76 .with_operation("Builder::build")
77 .with_context("root", root.to_string_lossy())
78 .set_source(e)
79 })?;
80 }
81 }
82 let root = root.canonicalize().map_err(|e| {
83 Error::new(
84 ErrorKind::Unexpected,
85 "canonicalize of root directory failed",
86 )
87 .with_operation("Builder::build")
88 .with_context("root", root.to_string_lossy())
89 .set_source(e)
90 })?;
91 let worker_threads = 1; let io_uring_entries = 1024;
93 Ok(MonoiofsBackend {
94 core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
95 })
96 }
97}
98
99#[derive(Debug, Clone)]
100pub struct MonoiofsBackend {
101 core: Arc<MonoiofsCore>,
102}
103
104impl Access for MonoiofsBackend {
105 type Reader = MonoiofsReader;
106 type Writer = MonoiofsWriter;
107 type Lister = ();
108 type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
109 type BlockingReader = ();
110 type BlockingWriter = ();
111 type BlockingLister = ();
112 type BlockingDeleter = ();
113
114 fn info(&self) -> Arc<AccessorInfo> {
115 self.core.info.clone()
116 }
117
118 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
119 let path = self.core.prepare_path(path);
120 let meta = self
121 .core
122 .dispatch(move || monoio::fs::metadata(path))
123 .await
124 .map_err(new_std_io_error)?;
125 let mode = if meta.is_dir() {
126 EntryMode::DIR
127 } else if meta.is_file() {
128 EntryMode::FILE
129 } else {
130 EntryMode::Unknown
131 };
132 let m = Metadata::new(mode)
133 .with_content_length(meta.len())
134 .with_last_modified(
135 meta.modified()
136 .map(DateTime::from)
137 .map_err(new_std_io_error)?,
138 );
139 Ok(RpStat::new(m))
140 }
141
142 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
143 let path = self.core.prepare_path(path);
144 let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
145 Ok((RpRead::default(), reader))
146 }
147
148 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
149 let path = self.core.prepare_write_path(path).await?;
150 let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
151 Ok((RpWrite::default(), writer))
152 }
153
154 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
155 Ok((
156 RpDelete::default(),
157 oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
158 ))
159 }
160
161 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
162 let from = self.core.prepare_path(from);
163 self.core
165 .dispatch({
166 let from = from.clone();
167 move || monoio::fs::metadata(from)
168 })
169 .await
170 .map_err(new_std_io_error)?;
171 let to = self.core.prepare_write_path(to).await?;
172 self.core
173 .dispatch(move || monoio::fs::rename(from, to))
174 .await
175 .map_err(new_std_io_error)?;
176 Ok(RpRename::default())
177 }
178
179 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
180 let path = self.core.prepare_path(path);
181 self.core
182 .dispatch(move || monoio::fs::create_dir_all(path))
183 .await
184 .map_err(new_std_io_error)?;
185 Ok(RpCreateDir::default())
186 }
187
188 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
189 let from = self.core.prepare_path(from);
190 self.core
192 .dispatch({
193 let from = from.clone();
194 move || monoio::fs::metadata(from)
195 })
196 .await
197 .map_err(new_std_io_error)?;
198 let to = self.core.prepare_write_path(to).await?;
199 self.core
200 .dispatch({
201 let core = self.core.clone();
202 move || async move {
203 let from = OpenOptions::new().read(true).open(from).await?;
204 let to = OpenOptions::new()
205 .write(true)
206 .create(true)
207 .truncate(true)
208 .open(to)
209 .await?;
210
211 let mut pos = 0;
216 let mut buf = core.buf_pool.get();
218 buf.reserve(BUFFER_SIZE);
220 let _ = buf.split_off(BUFFER_SIZE);
221
222 loop {
223 let result;
224 (result, buf) = from.read_at(buf, pos).await;
225 if result? == 0 {
226 break;
228 }
229 let result;
230 (result, buf) = to.write_all_at(buf, pos).await;
231 result?;
232 pos += buf.len() as u64;
233 buf.clear();
234 }
235 core.buf_pool.put(buf);
236 Ok(())
237 }
238 })
239 .await
240 .map_err(new_std_io_error)?;
241 Ok(RpCopy::default())
242 }
243}