opendal/services/monoiofs/
backend.rs1use std::fmt::Debug;
19use std::io;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use monoio::fs::OpenOptions;
25
26use super::core::MonoiofsCore;
27use super::core::BUFFER_SIZE;
28use super::delete::MonoiofsDeleter;
29use super::reader::MonoiofsReader;
30use super::writer::MonoiofsWriter;
31use crate::raw::*;
32use crate::services::MonoiofsConfig;
33use crate::*;
34
35impl Configurator for MonoiofsConfig {
36 type Builder = MonoiofsBuilder;
37 fn into_builder(self) -> Self::Builder {
38 MonoiofsBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
44#[derive(Default, Debug)]
45pub struct MonoiofsBuilder {
46 config: MonoiofsConfig,
47}
48
49impl MonoiofsBuilder {
50 pub fn root(mut self, root: &str) -> Self {
54 self.config.root = if root.is_empty() {
55 None
56 } else {
57 Some(root.to_string())
58 };
59 self
60 }
61}
62
63impl Builder for MonoiofsBuilder {
64 type Config = MonoiofsConfig;
65
66 fn build(self) -> Result<impl Access> {
67 let root = self.config.root.map(PathBuf::from).ok_or(
68 Error::new(ErrorKind::ConfigInvalid, "root is not specified")
69 .with_operation("Builder::build"),
70 )?;
71 if let Err(e) = std::fs::metadata(&root) {
72 if e.kind() == io::ErrorKind::NotFound {
73 std::fs::create_dir_all(&root).map_err(|e| {
74 Error::new(ErrorKind::Unexpected, "create root dir failed")
75 .with_operation("Builder::build")
76 .with_context("root", root.to_string_lossy())
77 .set_source(e)
78 })?;
79 }
80 }
81 let root = root.canonicalize().map_err(|e| {
82 Error::new(
83 ErrorKind::Unexpected,
84 "canonicalize of root directory failed",
85 )
86 .with_operation("Builder::build")
87 .with_context("root", root.to_string_lossy())
88 .set_source(e)
89 })?;
90 let worker_threads = 1; let io_uring_entries = 1024;
92 Ok(MonoiofsBackend {
93 core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
94 })
95 }
96}
97
98#[derive(Debug, Clone)]
99pub struct MonoiofsBackend {
100 core: Arc<MonoiofsCore>,
101}
102
103impl Access for MonoiofsBackend {
104 type Reader = MonoiofsReader;
105 type Writer = MonoiofsWriter;
106 type Lister = ();
107 type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
108
109 fn info(&self) -> Arc<AccessorInfo> {
110 self.core.info.clone()
111 }
112
113 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
114 let path = self.core.prepare_path(path);
115 let meta = self
116 .core
117 .dispatch(move || monoio::fs::metadata(path))
118 .await
119 .map_err(new_std_io_error)?;
120 let mode = if meta.is_dir() {
121 EntryMode::DIR
122 } else if meta.is_file() {
123 EntryMode::FILE
124 } else {
125 EntryMode::Unknown
126 };
127 let m = Metadata::new(mode)
128 .with_content_length(meta.len())
129 .with_last_modified(
130 meta.modified()
131 .map(DateTime::from)
132 .map_err(new_std_io_error)?,
133 );
134 Ok(RpStat::new(m))
135 }
136
137 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
138 let path = self.core.prepare_path(path);
139 let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
140 Ok((RpRead::default(), reader))
141 }
142
143 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
144 let path = self.core.prepare_write_path(path).await?;
145 let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
146 Ok((RpWrite::default(), writer))
147 }
148
149 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
150 Ok((
151 RpDelete::default(),
152 oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
153 ))
154 }
155
156 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
157 let from = self.core.prepare_path(from);
158 self.core
160 .dispatch({
161 let from = from.clone();
162 move || monoio::fs::metadata(from)
163 })
164 .await
165 .map_err(new_std_io_error)?;
166 let to = self.core.prepare_write_path(to).await?;
167 self.core
168 .dispatch(move || monoio::fs::rename(from, to))
169 .await
170 .map_err(new_std_io_error)?;
171 Ok(RpRename::default())
172 }
173
174 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
175 let path = self.core.prepare_path(path);
176 self.core
177 .dispatch(move || monoio::fs::create_dir_all(path))
178 .await
179 .map_err(new_std_io_error)?;
180 Ok(RpCreateDir::default())
181 }
182
183 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
184 let from = self.core.prepare_path(from);
185 self.core
187 .dispatch({
188 let from = from.clone();
189 move || monoio::fs::metadata(from)
190 })
191 .await
192 .map_err(new_std_io_error)?;
193 let to = self.core.prepare_write_path(to).await?;
194 self.core
195 .dispatch({
196 let core = self.core.clone();
197 move || async move {
198 let from = OpenOptions::new().read(true).open(from).await?;
199 let to = OpenOptions::new()
200 .write(true)
201 .create(true)
202 .truncate(true)
203 .open(to)
204 .await?;
205
206 let mut pos = 0;
211 let mut buf = core.buf_pool.get();
213 buf.reserve(BUFFER_SIZE);
215 let _ = buf.split_off(BUFFER_SIZE);
216
217 loop {
218 let result;
219 (result, buf) = from.read_at(buf, pos).await;
220 if result? == 0 {
221 break;
223 }
224 let result;
225 (result, buf) = to.write_all_at(buf, pos).await;
226 result?;
227 pos += buf.len() as u64;
228 buf.clear();
229 }
230 core.buf_pool.put(buf);
231 Ok(())
232 }
233 })
234 .await
235 .map_err(new_std_io_error)?;
236 Ok(RpCopy::default())
237 }
238}