opendal/services/monoiofs/
backend.rs1use monoio::fs::OpenOptions;
19use std::fmt::Debug;
20use std::io;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use super::core::BUFFER_SIZE;
25use super::core::MonoiofsCore;
26use super::delete::MonoiofsDeleter;
27use super::reader::MonoiofsReader;
28use super::writer::MonoiofsWriter;
29use crate::raw::*;
30use crate::services::MonoiofsConfig;
31use crate::*;
32
33#[doc = include_str!("docs.md")]
35#[derive(Default, Debug)]
36pub struct MonoiofsBuilder {
37 pub(super) config: MonoiofsConfig,
38}
39
40impl MonoiofsBuilder {
41 pub fn root(mut self, root: &str) -> Self {
45 self.config.root = if root.is_empty() {
46 None
47 } else {
48 Some(root.to_string())
49 };
50 self
51 }
52}
53
54impl Builder for MonoiofsBuilder {
55 type Config = MonoiofsConfig;
56
57 fn build(self) -> Result<impl Access> {
58 let root = self.config.root.map(PathBuf::from).ok_or(
59 Error::new(ErrorKind::ConfigInvalid, "root is not specified")
60 .with_operation("Builder::build"),
61 )?;
62 if let Err(e) = std::fs::metadata(&root) {
63 if e.kind() == io::ErrorKind::NotFound {
64 std::fs::create_dir_all(&root).map_err(|e| {
65 Error::new(ErrorKind::Unexpected, "create root dir failed")
66 .with_operation("Builder::build")
67 .with_context("root", root.to_string_lossy())
68 .set_source(e)
69 })?;
70 }
71 }
72 let root = root.canonicalize().map_err(|e| {
73 Error::new(
74 ErrorKind::Unexpected,
75 "canonicalize of root directory failed",
76 )
77 .with_operation("Builder::build")
78 .with_context("root", root.to_string_lossy())
79 .set_source(e)
80 })?;
81 let worker_threads = 1; let io_uring_entries = 1024;
83 Ok(MonoiofsBackend {
84 core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
85 })
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct MonoiofsBackend {
91 core: Arc<MonoiofsCore>,
92}
93
94impl Access for MonoiofsBackend {
95 type Reader = MonoiofsReader;
96 type Writer = MonoiofsWriter;
97 type Lister = ();
98 type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
99
100 fn info(&self) -> Arc<AccessorInfo> {
101 self.core.info.clone()
102 }
103
104 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
105 let path = self.core.prepare_path(path);
106 let meta = self
107 .core
108 .dispatch(move || monoio::fs::metadata(path))
109 .await
110 .map_err(new_std_io_error)?;
111 let mode = if meta.is_dir() {
112 EntryMode::DIR
113 } else if meta.is_file() {
114 EntryMode::FILE
115 } else {
116 EntryMode::Unknown
117 };
118 let m = Metadata::new(mode)
119 .with_content_length(meta.len())
120 .with_last_modified(Timestamp::try_from(
121 meta.modified().map_err(new_std_io_error)?,
122 )?);
123 Ok(RpStat::new(m))
124 }
125
126 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
127 let path = self.core.prepare_path(path);
128 let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
129 Ok((RpRead::default(), reader))
130 }
131
132 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
133 let path = self.core.prepare_write_path(path).await?;
134 let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
135 Ok((RpWrite::default(), writer))
136 }
137
138 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
139 Ok((
140 RpDelete::default(),
141 oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
142 ))
143 }
144
145 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
146 let from = self.core.prepare_path(from);
147 self.core
149 .dispatch({
150 let from = from.clone();
151 move || monoio::fs::metadata(from)
152 })
153 .await
154 .map_err(new_std_io_error)?;
155 let to = self.core.prepare_write_path(to).await?;
156 self.core
157 .dispatch(move || monoio::fs::rename(from, to))
158 .await
159 .map_err(new_std_io_error)?;
160 Ok(RpRename::default())
161 }
162
163 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
164 let path = self.core.prepare_path(path);
165 self.core
166 .dispatch(move || monoio::fs::create_dir_all(path))
167 .await
168 .map_err(new_std_io_error)?;
169 Ok(RpCreateDir::default())
170 }
171
172 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
173 let from = self.core.prepare_path(from);
174 self.core
176 .dispatch({
177 let from = from.clone();
178 move || monoio::fs::metadata(from)
179 })
180 .await
181 .map_err(new_std_io_error)?;
182 let to = self.core.prepare_write_path(to).await?;
183 self.core
184 .dispatch({
185 let core = self.core.clone();
186 move || async move {
187 let from = OpenOptions::new().read(true).open(from).await?;
188 let to = OpenOptions::new()
189 .write(true)
190 .create(true)
191 .truncate(true)
192 .open(to)
193 .await?;
194
195 let mut pos = 0;
200 let mut buf = core.buf_pool.get();
202 buf.reserve(BUFFER_SIZE);
204 let _ = buf.split_off(BUFFER_SIZE);
205
206 loop {
207 let result;
208 (result, buf) = from.read_at(buf, pos).await;
209 if result? == 0 {
210 break;
212 }
213 let result;
214 (result, buf) = to.write_all_at(buf, pos).await;
215 result?;
216 pos += buf.len() as u64;
217 buf.clear();
218 }
219 core.buf_pool.put(buf);
220 Ok(())
221 }
222 })
223 .await
224 .map_err(new_std_io_error)?;
225 Ok(RpCopy::default())
226 }
227}