1use std::io::SeekFrom;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use chrono::DateTime;
23use log::debug;
24
25use super::core::*;
26use super::delete::FsDeleter;
27use super::lister::FsLister;
28use super::reader::FsReader;
29use super::writer::FsWriter;
30use super::writer::FsWriters;
31use crate::raw::*;
32use crate::services::FsConfig;
33use crate::*;
34
35impl Configurator for FsConfig {
36 type Builder = FsBuilder;
37 fn into_builder(self) -> Self::Builder {
38 FsBuilder { config: self }
39 }
40}
41
42#[doc = include_str!("docs.md")]
44#[derive(Default, Debug)]
45pub struct FsBuilder {
46 config: FsConfig,
47}
48
49impl FsBuilder {
50 pub fn root(mut self, root: &str) -> Self {
52 self.config.root = if root.is_empty() {
53 None
54 } else {
55 Some(root.to_string())
56 };
57
58 self
59 }
60
61 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
68 if !dir.is_empty() {
69 self.config.atomic_write_dir = Some(dir.to_string());
70 }
71
72 self
73 }
74}
75
76impl Builder for FsBuilder {
77 const SCHEME: Scheme = Scheme::Fs;
78 type Config = FsConfig;
79
80 fn build(self) -> Result<impl Access> {
81 debug!("backend build started: {:?}", &self);
82
83 let root = match self.config.root.map(PathBuf::from) {
84 Some(root) => Ok(root),
85 None => Err(Error::new(
86 ErrorKind::ConfigInvalid,
87 "root is not specified",
88 )),
89 }?;
90 debug!("backend use root {}", root.to_string_lossy());
91
92 if let Err(e) = std::fs::metadata(&root) {
94 if e.kind() == std::io::ErrorKind::NotFound {
95 std::fs::create_dir_all(&root).map_err(|e| {
96 Error::new(ErrorKind::Unexpected, "create root dir failed")
97 .with_operation("Builder::build")
98 .with_context("root", root.to_string_lossy())
99 .set_source(e)
100 })?;
101 }
102 }
103
104 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
105
106 if let Some(d) = &atomic_write_dir {
108 if let Err(e) = std::fs::metadata(d) {
109 if e.kind() == std::io::ErrorKind::NotFound {
110 std::fs::create_dir_all(d).map_err(|e| {
111 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
112 .with_operation("Builder::build")
113 .with_context("atomic_write_dir", d.to_string_lossy())
114 .set_source(e)
115 })?;
116 }
117 }
118 }
119
120 let root = root.canonicalize().map_err(|e| {
123 Error::new(
124 ErrorKind::Unexpected,
125 "canonicalize of root directory failed",
126 )
127 .with_operation("Builder::build")
128 .with_context("root", root.to_string_lossy())
129 .set_source(e)
130 })?;
131
132 let atomic_write_dir = atomic_write_dir
135 .map(|p| {
136 p.canonicalize().map(Some).map_err(|e| {
137 Error::new(
138 ErrorKind::Unexpected,
139 "canonicalize of atomic_write_dir directory failed",
140 )
141 .with_operation("Builder::build")
142 .with_context("root", root.to_string_lossy())
143 .set_source(e)
144 })
145 })
146 .unwrap_or(Ok(None))?;
147
148 Ok(FsBackend {
149 core: Arc::new(FsCore {
150 info: {
151 let am = AccessorInfo::default();
152 am.set_scheme(Scheme::Fs)
153 .set_root(&root.to_string_lossy())
154 .set_native_capability(Capability {
155 stat: true,
156 stat_has_content_length: true,
157 stat_has_last_modified: true,
158
159 read: true,
160
161 write: true,
162 write_can_empty: true,
163 write_can_append: true,
164 write_can_multi: true,
165 write_with_if_not_exists: true,
166
167 create_dir: true,
168 delete: true,
169
170 list: true,
171
172 copy: true,
173 rename: true,
174
175 shared: true,
176
177 ..Default::default()
178 });
179
180 am.into()
181 },
182 root,
183 atomic_write_dir,
184 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
185 }),
186 })
187 }
188}
189
190#[derive(Debug, Clone)]
192pub struct FsBackend {
193 core: Arc<FsCore>,
194}
195
196impl Access for FsBackend {
197 type Reader = FsReader<tokio::fs::File>;
198 type Writer = FsWriters;
199 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
200 type Deleter = oio::OneShotDeleter<FsDeleter>;
201
202 fn info(&self) -> Arc<AccessorInfo> {
203 self.core.info.clone()
204 }
205
206 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
207 let p = self.core.root.join(path.trim_end_matches('/'));
208
209 tokio::fs::create_dir_all(&p)
210 .await
211 .map_err(new_std_io_error)?;
212
213 Ok(RpCreateDir::default())
214 }
215
216 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
217 let p = self.core.root.join(path.trim_end_matches('/'));
218
219 let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
220
221 let mode = if meta.is_dir() {
222 EntryMode::DIR
223 } else if meta.is_file() {
224 EntryMode::FILE
225 } else {
226 EntryMode::Unknown
227 };
228 let m = Metadata::new(mode)
229 .with_content_length(meta.len())
230 .with_last_modified(
231 meta.modified()
232 .map(DateTime::from)
233 .map_err(new_std_io_error)?,
234 );
235
236 Ok(RpStat::new(m))
237 }
238
239 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
249 let p = self.core.root.join(path.trim_end_matches('/'));
250
251 let mut f = tokio::fs::OpenOptions::new()
252 .read(true)
253 .open(&p)
254 .await
255 .map_err(new_std_io_error)?;
256
257 if args.range().offset() != 0 {
258 use tokio::io::AsyncSeekExt;
259
260 f.seek(SeekFrom::Start(args.range().offset()))
261 .await
262 .map_err(new_std_io_error)?;
263 }
264
265 let r = FsReader::new(
266 self.core.clone(),
267 f,
268 args.range().size().unwrap_or(u64::MAX) as _,
269 );
270 Ok((RpRead::new(), r))
271 }
272
273 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
274 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
275 let target_path = self
276 .core
277 .ensure_write_abs_path(&self.core.root, path)
278 .await?;
279 let tmp_path = self
280 .core
281 .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
282 .await?;
283
284 if op.append()
286 && tokio::fs::try_exists(&target_path)
287 .await
288 .map_err(new_std_io_error)?
289 {
290 (target_path, None)
291 } else {
292 (target_path, Some(tmp_path))
293 }
294 } else {
295 let p = self
296 .core
297 .ensure_write_abs_path(&self.core.root, path)
298 .await?;
299
300 (p, None)
301 };
302
303 let mut open_options = tokio::fs::OpenOptions::new();
304 if op.if_not_exists() {
305 open_options.create_new(true);
306 } else {
307 open_options.create(true);
308 }
309
310 open_options.write(true);
311
312 if op.append() {
313 open_options.append(true);
314 } else {
315 open_options.truncate(true);
316 }
317
318 let f = open_options
319 .open(tmp_path.as_ref().unwrap_or(&target_path))
320 .await
321 .map_err(|e| {
322 match e.kind() {
323 std::io::ErrorKind::AlreadyExists => {
324 Error::new(
326 ErrorKind::ConditionNotMatch,
327 "The file already exists in the filesystem",
328 )
329 .set_source(e)
330 }
331 _ => new_std_io_error(e),
332 }
333 })?;
334
335 let w = FsWriter::new(target_path, tmp_path, f);
336
337 let w = if op.append() {
338 FsWriters::One(w)
339 } else {
340 FsWriters::Two(oio::PositionWriter::new(
341 self.info().clone(),
342 w,
343 op.concurrent(),
344 ))
345 };
346
347 Ok((RpWrite::default(), w))
348 }
349
350 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
351 Ok((
352 RpDelete::default(),
353 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
354 ))
355 }
356
357 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
358 let p = self.core.root.join(path.trim_end_matches('/'));
359
360 let f = match tokio::fs::read_dir(&p).await {
361 Ok(rd) => rd,
362 Err(e) => {
363 return match e.kind() {
364 std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
366 _ => {
376 #[cfg(unix)]
378 if e.raw_os_error() == Some(20) {
379 return Ok((RpList::default(), None));
381 }
382 #[cfg(windows)]
383 if e.raw_os_error() == Some(267) {
384 return Ok((RpList::default(), None));
386 }
387
388 Err(new_std_io_error(e))
389 }
390 };
391 }
392 };
393
394 let rd = FsLister::new(&self.core.root, path, f);
395 Ok((RpList::default(), Some(rd)))
396 }
397
398 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
399 let from = self.core.root.join(from.trim_end_matches('/'));
400
401 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
403
404 let to = self
405 .core
406 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
407 .await?;
408
409 tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
410
411 Ok(RpCopy::default())
412 }
413
414 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
415 let from = self.core.root.join(from.trim_end_matches('/'));
416
417 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
419
420 let to = self
421 .core
422 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
423 .await?;
424
425 tokio::fs::rename(from, to)
426 .await
427 .map_err(new_std_io_error)?;
428
429 Ok(RpRename::default())
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_tmp_file_of() {
439 let cases = vec![
440 ("hello.txt", "hello.txt"),
441 ("/tmp/opendal.log", "opendal.log"),
442 ("/abc/def/hello.parquet", "hello.parquet"),
443 ];
444
445 for (path, expected_prefix) in cases {
446 let tmp_file = tmp_file_of(path);
447 assert!(tmp_file.len() > expected_prefix.len());
448 assert!(tmp_file.starts_with(expected_prefix));
449 }
450 }
451}