1use std::collections::HashMap;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use log::debug;
23
24use http::Uri;
25use percent_encoding::percent_decode_str;
26
27use super::core::*;
28use super::delete::FsDeleter;
29use super::lister::FsLister;
30use super::reader::FsReader;
31use super::writer::FsWriter;
32use super::writer::FsWriters;
33use super::FS_SCHEME;
34use crate::raw::*;
35use crate::services::FsConfig;
36use crate::*;
37impl Configurator for FsConfig {
38 type Builder = FsBuilder;
39
40 fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
41 let mut map = options.clone();
42
43 if !map.contains_key("root") {
44 let path = percent_decode_str(uri.path()).decode_utf8_lossy();
45 if path.is_empty() || path == "/" {
46 return Err(Error::new(
47 ErrorKind::ConfigInvalid,
48 "fs uri requires absolute path",
49 ));
50 }
51 if !path.starts_with('/') {
52 return Err(Error::new(
53 ErrorKind::ConfigInvalid,
54 "fs uri root must be absolute",
55 ));
56 }
57 map.insert("root".to_string(), path.to_string());
58 }
59
60 Self::from_iter(map)
61 }
62
63 fn into_builder(self) -> Self::Builder {
64 FsBuilder { config: self }
65 }
66}
67
68#[doc = include_str!("docs.md")]
70#[derive(Default, Debug)]
71pub struct FsBuilder {
72 config: FsConfig,
73}
74
75impl FsBuilder {
76 pub fn root(mut self, root: &str) -> Self {
78 self.config.root = if root.is_empty() {
79 None
80 } else {
81 Some(root.to_string())
82 };
83
84 self
85 }
86
87 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
94 if !dir.is_empty() {
95 self.config.atomic_write_dir = Some(dir.to_string());
96 }
97
98 self
99 }
100}
101
102impl Builder for FsBuilder {
103 type Config = FsConfig;
104
105 fn build(self) -> Result<impl Access> {
106 debug!("backend build started: {:?}", &self);
107
108 let root = match self.config.root.map(PathBuf::from) {
109 Some(root) => Ok(root),
110 None => Err(Error::new(
111 ErrorKind::ConfigInvalid,
112 "root is not specified",
113 )),
114 }?;
115 debug!("backend use root {}", root.to_string_lossy());
116
117 if let Err(e) = std::fs::metadata(&root) {
119 if e.kind() == std::io::ErrorKind::NotFound {
120 std::fs::create_dir_all(&root).map_err(|e| {
121 Error::new(ErrorKind::Unexpected, "create root dir failed")
122 .with_operation("Builder::build")
123 .with_context("root", root.to_string_lossy())
124 .set_source(e)
125 })?;
126 }
127 }
128
129 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
130
131 if let Some(d) = &atomic_write_dir {
133 if let Err(e) = std::fs::metadata(d) {
134 if e.kind() == std::io::ErrorKind::NotFound {
135 std::fs::create_dir_all(d).map_err(|e| {
136 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
137 .with_operation("Builder::build")
138 .with_context("atomic_write_dir", d.to_string_lossy())
139 .set_source(e)
140 })?;
141 }
142 }
143 }
144
145 let root = root.canonicalize().map_err(|e| {
148 Error::new(
149 ErrorKind::Unexpected,
150 "canonicalize of root directory failed",
151 )
152 .set_source(e)
153 })?;
154
155 let atomic_write_dir = atomic_write_dir
158 .map(|p| {
159 p.canonicalize().map(Some).map_err(|e| {
160 Error::new(
161 ErrorKind::Unexpected,
162 "canonicalize of atomic_write_dir directory failed",
163 )
164 .with_operation("Builder::build")
165 .with_context("root", root.to_string_lossy())
166 .set_source(e)
167 })
168 })
169 .unwrap_or(Ok(None))?;
170
171 Ok(FsBackend {
172 core: Arc::new(FsCore {
173 info: {
174 let am = AccessorInfo::default();
175 am.set_scheme(FS_SCHEME)
176 .set_root(&root.to_string_lossy())
177 .set_native_capability(Capability {
178 stat: true,
179
180 read: true,
181
182 write: true,
183 write_can_empty: true,
184 write_can_append: true,
185 write_can_multi: true,
186 write_with_if_not_exists: true,
187
188 create_dir: true,
189 delete: true,
190
191 list: true,
192
193 copy: true,
194 rename: true,
195
196 shared: true,
197
198 ..Default::default()
199 });
200
201 am.into()
202 },
203 root,
204 atomic_write_dir,
205 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
206 }),
207 })
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct FsBackend {
214 core: Arc<FsCore>,
215}
216
217impl Access for FsBackend {
218 type Reader = FsReader<tokio::fs::File>;
219 type Writer = FsWriters;
220 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
221 type Deleter = oio::OneShotDeleter<FsDeleter>;
222
223 fn info(&self) -> Arc<AccessorInfo> {
224 self.core.info.clone()
225 }
226
227 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
228 self.core.fs_create_dir(path).await?;
229 Ok(RpCreateDir::default())
230 }
231
232 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
233 let m = self.core.fs_stat(path).await?;
234 Ok(RpStat::new(m))
235 }
236
237 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
247 let f = self.core.fs_read(path, &args).await?;
248 let r = FsReader::new(
249 self.core.clone(),
250 f,
251 args.range().size().unwrap_or(u64::MAX) as _,
252 );
253 Ok((RpRead::new(), r))
254 }
255
256 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
257 let is_append = op.append();
258 let concurrent = op.concurrent();
259
260 let writer = FsWriter::create(self.core.clone(), path, op).await?;
261
262 let writer = if is_append {
263 FsWriters::One(writer)
264 } else {
265 FsWriters::Two(oio::PositionWriter::new(
266 self.info().clone(),
267 writer,
268 concurrent,
269 ))
270 };
271
272 Ok((RpWrite::default(), writer))
273 }
274
275 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
276 Ok((
277 RpDelete::default(),
278 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
279 ))
280 }
281
282 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
283 match self.core.fs_list(path).await? {
284 Some(f) => {
285 let rd = FsLister::new(&self.core.root, path, f);
286 Ok((RpList::default(), Some(rd)))
287 }
288 None => Ok((RpList::default(), None)),
289 }
290 }
291
292 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
293 self.core.fs_copy(from, to).await?;
294 Ok(RpCopy::default())
295 }
296
297 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
298 self.core.fs_rename(from, to).await?;
299 Ok(RpRename::default())
300 }
301}