opendal/services/monoiofs/
backend.rs1use std::fmt::Debug;
19use std::io;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use monoio::fs::OpenOptions;
24
25use super::config::MonoiofsConfig;
26use super::core::BUFFER_SIZE;
27use super::core::MonoiofsCore;
28use super::deleter::MonoiofsDeleter;
29use super::reader::MonoiofsReader;
30use super::writer::MonoiofsWriter;
31use crate::raw::*;
32use crate::*;
33
34#[doc = include_str!("docs.md")]
36#[derive(Debug, Default)]
37pub struct MonoiofsBuilder {
38 pub(super) config: MonoiofsConfig,
39}
40
41impl MonoiofsBuilder {
42 pub fn root(mut self, root: &str) -> Self {
46 self.config.root = if root.is_empty() {
47 None
48 } else {
49 Some(root.to_string())
50 };
51 self
52 }
53}
54
55impl Builder for MonoiofsBuilder {
56 type Config = MonoiofsConfig;
57
58 fn build(self) -> Result<impl Access> {
59 let root = self.config.root.map(PathBuf::from).ok_or(
60 Error::new(ErrorKind::ConfigInvalid, "root is not specified")
61 .with_operation("Builder::build"),
62 )?;
63 if let Err(e) = std::fs::metadata(&root) {
64 if e.kind() == io::ErrorKind::NotFound {
65 std::fs::create_dir_all(&root).map_err(|e| {
66 Error::new(ErrorKind::Unexpected, "create root dir failed")
67 .with_operation("Builder::build")
68 .with_context("root", root.to_string_lossy())
69 .set_source(e)
70 })?;
71 }
72 }
73 let root = root.canonicalize().map_err(|e| {
74 Error::new(
75 ErrorKind::Unexpected,
76 "canonicalize of root directory failed",
77 )
78 .with_operation("Builder::build")
79 .with_context("root", root.to_string_lossy())
80 .set_source(e)
81 })?;
82 let worker_threads = 1; let io_uring_entries = 1024;
84 Ok(MonoiofsBackend {
85 core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
86 })
87 }
88}
89
90#[derive(Debug, Clone)]
91pub struct MonoiofsBackend {
92 core: Arc<MonoiofsCore>,
93}
94
95impl Access for MonoiofsBackend {
96 type Reader = MonoiofsReader;
97 type Writer = MonoiofsWriter;
98 type Lister = ();
99 type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
100
101 fn info(&self) -> Arc<AccessorInfo> {
102 self.core.info.clone()
103 }
104
105 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
106 let path = self.core.prepare_path(path);
107 let meta = self
108 .core
109 .dispatch(move || monoio::fs::metadata(path))
110 .await
111 .map_err(new_std_io_error)?;
112 let mode = if meta.is_dir() {
113 EntryMode::DIR
114 } else if meta.is_file() {
115 EntryMode::FILE
116 } else {
117 EntryMode::Unknown
118 };
119 let m = Metadata::new(mode)
120 .with_content_length(meta.len())
121 .with_last_modified(Timestamp::try_from(
122 meta.modified().map_err(new_std_io_error)?,
123 )?);
124 Ok(RpStat::new(m))
125 }
126
127 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
128 let path = self.core.prepare_path(path);
129 let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
130 Ok((RpRead::default(), reader))
131 }
132
133 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
134 let path = self.core.prepare_write_path(path).await?;
135 let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
136 Ok((RpWrite::default(), writer))
137 }
138
139 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
140 Ok((
141 RpDelete::default(),
142 oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
143 ))
144 }
145
146 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
147 let from = self.core.prepare_path(from);
148 self.core
150 .dispatch({
151 let from = from.clone();
152 move || monoio::fs::metadata(from)
153 })
154 .await
155 .map_err(new_std_io_error)?;
156 let to = self.core.prepare_write_path(to).await?;
157 self.core
158 .dispatch(move || monoio::fs::rename(from, to))
159 .await
160 .map_err(new_std_io_error)?;
161 Ok(RpRename::default())
162 }
163
164 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
165 let path = self.core.prepare_path(path);
166 self.core
167 .dispatch(move || monoio::fs::create_dir_all(path))
168 .await
169 .map_err(new_std_io_error)?;
170 Ok(RpCreateDir::default())
171 }
172
173 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
174 let from = self.core.prepare_path(from);
175 self.core
177 .dispatch({
178 let from = from.clone();
179 move || monoio::fs::metadata(from)
180 })
181 .await
182 .map_err(new_std_io_error)?;
183 let to = self.core.prepare_write_path(to).await?;
184 self.core
185 .dispatch({
186 let core = self.core.clone();
187 move || async move {
188 let from = OpenOptions::new().read(true).open(from).await?;
189 let to = OpenOptions::new()
190 .write(true)
191 .create(true)
192 .truncate(true)
193 .open(to)
194 .await?;
195
196 let mut pos = 0;
201 let mut buf = core.buf_pool.get();
203 buf.reserve(BUFFER_SIZE);
205 let _ = buf.split_off(BUFFER_SIZE);
206
207 loop {
208 let result;
209 (result, buf) = from.read_at(buf, pos).await;
210 if result? == 0 {
211 break;
213 }
214 let result;
215 (result, buf) = to.write_all_at(buf, pos).await;
216 result?;
217 pos += buf.len() as u64;
218 buf.clear();
219 }
220 core.buf_pool.put(buf);
221 Ok(())
222 }
223 })
224 .await
225 .map_err(new_std_io_error)?;
226 Ok(RpCopy::default())
227 }
228}