1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use log::debug;
25
26use super::core::*;
27use super::delete::FsDeleter;
28use super::lister::FsLister;
29use super::reader::FsReader;
30use super::writer::FsWriter;
31use super::writer::FsWriters;
32use crate::raw::*;
33use crate::services::FsConfig;
34use crate::*;
35
36impl Configurator for FsConfig {
37 type Builder = FsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 FsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Debug)]
46pub struct FsBuilder {
47 config: FsConfig,
48}
49
50impl FsBuilder {
51 pub fn root(mut self, root: &str) -> Self {
53 self.config.root = if root.is_empty() {
54 None
55 } else {
56 Some(root.to_string())
57 };
58
59 self
60 }
61
62 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
69 if !dir.is_empty() {
70 self.config.atomic_write_dir = Some(dir.to_string());
71 }
72
73 self
74 }
75}
76
77impl Builder for FsBuilder {
78 const SCHEME: Scheme = Scheme::Fs;
79 type Config = FsConfig;
80
81 fn build(self) -> Result<impl Access> {
82 debug!("backend build started: {:?}", &self);
83
84 let root = match self.config.root.map(PathBuf::from) {
85 Some(root) => Ok(root),
86 None => Err(Error::new(
87 ErrorKind::ConfigInvalid,
88 "root is not specified",
89 )),
90 }?;
91 debug!("backend use root {}", root.to_string_lossy());
92
93 if let Err(e) = std::fs::metadata(&root) {
95 if e.kind() == std::io::ErrorKind::NotFound {
96 std::fs::create_dir_all(&root).map_err(|e| {
97 Error::new(ErrorKind::Unexpected, "create root dir failed")
98 .with_operation("Builder::build")
99 .with_context("root", root.to_string_lossy())
100 .set_source(e)
101 })?;
102 }
103 }
104
105 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
106
107 if let Some(d) = &atomic_write_dir {
109 if let Err(e) = std::fs::metadata(d) {
110 if e.kind() == std::io::ErrorKind::NotFound {
111 std::fs::create_dir_all(d).map_err(|e| {
112 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
113 .with_operation("Builder::build")
114 .with_context("atomic_write_dir", d.to_string_lossy())
115 .set_source(e)
116 })?;
117 }
118 }
119 }
120
121 let root = root.canonicalize().map_err(|e| {
124 Error::new(
125 ErrorKind::Unexpected,
126 "canonicalize of root directory failed",
127 )
128 .with_operation("Builder::build")
129 .with_context("root", root.to_string_lossy())
130 .set_source(e)
131 })?;
132
133 let atomic_write_dir = atomic_write_dir
136 .map(|p| {
137 p.canonicalize().map(Some).map_err(|e| {
138 Error::new(
139 ErrorKind::Unexpected,
140 "canonicalize of atomic_write_dir directory failed",
141 )
142 .with_operation("Builder::build")
143 .with_context("root", root.to_string_lossy())
144 .set_source(e)
145 })
146 })
147 .unwrap_or(Ok(None))?;
148
149 Ok(FsBackend {
150 core: Arc::new(FsCore {
151 info: {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::Fs)
154 .set_root(&root.to_string_lossy())
155 .set_native_capability(Capability {
156 stat: true,
157 stat_has_content_length: true,
158 stat_has_last_modified: true,
159
160 read: true,
161
162 write: true,
163 write_can_empty: true,
164 write_can_append: true,
165 write_can_multi: true,
166 write_with_if_not_exists: true,
167
168 create_dir: true,
169 delete: true,
170
171 list: true,
172
173 copy: true,
174 rename: true,
175 blocking: true,
176
177 shared: true,
178
179 ..Default::default()
180 });
181
182 am.into()
183 },
184 root,
185 atomic_write_dir,
186 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
187 }),
188 })
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct FsBackend {
195 core: Arc<FsCore>,
196}
197
198impl Access for FsBackend {
199 type Reader = FsReader<tokio::fs::File>;
200 type Writer = FsWriters;
201 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
202 type Deleter = oio::OneShotDeleter<FsDeleter>;
203 type BlockingReader = FsReader<std::fs::File>;
204 type BlockingWriter = FsWriter<std::fs::File>;
205 type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
206 type BlockingDeleter = oio::OneShotDeleter<FsDeleter>;
207
208 fn info(&self) -> Arc<AccessorInfo> {
209 self.core.info.clone()
210 }
211
212 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
213 let p = self.core.root.join(path.trim_end_matches('/'));
214
215 tokio::fs::create_dir_all(&p)
216 .await
217 .map_err(new_std_io_error)?;
218
219 Ok(RpCreateDir::default())
220 }
221
222 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
223 let p = self.core.root.join(path.trim_end_matches('/'));
224
225 let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
226
227 let mode = if meta.is_dir() {
228 EntryMode::DIR
229 } else if meta.is_file() {
230 EntryMode::FILE
231 } else {
232 EntryMode::Unknown
233 };
234 let m = Metadata::new(mode)
235 .with_content_length(meta.len())
236 .with_last_modified(
237 meta.modified()
238 .map(DateTime::from)
239 .map_err(new_std_io_error)?,
240 );
241
242 Ok(RpStat::new(m))
243 }
244
245 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255 let p = self.core.root.join(path.trim_end_matches('/'));
256
257 let mut f = tokio::fs::OpenOptions::new()
258 .read(true)
259 .open(&p)
260 .await
261 .map_err(new_std_io_error)?;
262
263 if args.range().offset() != 0 {
264 use tokio::io::AsyncSeekExt;
265
266 f.seek(SeekFrom::Start(args.range().offset()))
267 .await
268 .map_err(new_std_io_error)?;
269 }
270
271 let r = FsReader::new(
272 self.core.clone(),
273 f,
274 args.range().size().unwrap_or(u64::MAX) as _,
275 );
276 Ok((RpRead::new(), r))
277 }
278
279 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
280 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
281 let target_path = self
282 .core
283 .ensure_write_abs_path(&self.core.root, path)
284 .await?;
285 let tmp_path = self
286 .core
287 .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
288 .await?;
289
290 if op.append()
292 && tokio::fs::try_exists(&target_path)
293 .await
294 .map_err(new_std_io_error)?
295 {
296 (target_path, None)
297 } else {
298 (target_path, Some(tmp_path))
299 }
300 } else {
301 let p = self
302 .core
303 .ensure_write_abs_path(&self.core.root, path)
304 .await?;
305
306 (p, None)
307 };
308
309 let mut open_options = tokio::fs::OpenOptions::new();
310 if op.if_not_exists() {
311 open_options.create_new(true);
312 } else {
313 open_options.create(true);
314 }
315
316 open_options.write(true);
317
318 if op.append() {
319 open_options.append(true);
320 } else {
321 open_options.truncate(true);
322 }
323
324 let f = open_options
325 .open(tmp_path.as_ref().unwrap_or(&target_path))
326 .await
327 .map_err(|e| {
328 match e.kind() {
329 std::io::ErrorKind::AlreadyExists => {
330 Error::new(
332 ErrorKind::ConditionNotMatch,
333 "The file already exists in the filesystem",
334 )
335 .set_source(e)
336 }
337 _ => new_std_io_error(e),
338 }
339 })?;
340
341 let w = FsWriter::new(target_path, tmp_path, f);
342
343 let w = if op.append() {
344 FsWriters::One(w)
345 } else {
346 FsWriters::Two(oio::PositionWriter::new(
347 self.info().clone(),
348 w,
349 op.concurrent(),
350 ))
351 };
352
353 Ok((RpWrite::default(), w))
354 }
355
356 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357 Ok((
358 RpDelete::default(),
359 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
360 ))
361 }
362
363 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
364 let p = self.core.root.join(path.trim_end_matches('/'));
365
366 let f = match tokio::fs::read_dir(&p).await {
367 Ok(rd) => rd,
368 Err(e) => {
369 return match e.kind() {
370 std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
372 _ => {
382 #[cfg(unix)]
384 if e.raw_os_error() == Some(20) {
385 return Ok((RpList::default(), None));
387 }
388 #[cfg(windows)]
389 if e.raw_os_error() == Some(267) {
390 return Ok((RpList::default(), None));
392 }
393
394 Err(new_std_io_error(e))
395 }
396 };
397 }
398 };
399
400 let rd = FsLister::new(&self.core.root, path, f);
401 Ok((RpList::default(), Some(rd)))
402 }
403
404 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
405 let from = self.core.root.join(from.trim_end_matches('/'));
406
407 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
409
410 let to = self
411 .core
412 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
413 .await?;
414
415 tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
416
417 Ok(RpCopy::default())
418 }
419
420 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
421 let from = self.core.root.join(from.trim_end_matches('/'));
422
423 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
425
426 let to = self
427 .core
428 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
429 .await?;
430
431 tokio::fs::rename(from, to)
432 .await
433 .map_err(new_std_io_error)?;
434
435 Ok(RpRename::default())
436 }
437
438 fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
439 let p = self.core.root.join(path.trim_end_matches('/'));
440
441 std::fs::create_dir_all(p).map_err(new_std_io_error)?;
442
443 Ok(RpCreateDir::default())
444 }
445
446 fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
447 let p = self.core.root.join(path.trim_end_matches('/'));
448
449 let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
450
451 let mode = if meta.is_dir() {
452 EntryMode::DIR
453 } else if meta.is_file() {
454 EntryMode::FILE
455 } else {
456 EntryMode::Unknown
457 };
458 let m = Metadata::new(mode)
459 .with_content_length(meta.len())
460 .with_last_modified(
461 meta.modified()
462 .map(DateTime::from)
463 .map_err(new_std_io_error)?,
464 );
465
466 Ok(RpStat::new(m))
467 }
468
469 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
470 let p = self.core.root.join(path.trim_end_matches('/'));
471
472 let mut f = std::fs::OpenOptions::new()
473 .read(true)
474 .open(p)
475 .map_err(new_std_io_error)?;
476
477 if args.range().offset() != 0 {
478 use std::io::Seek;
479
480 f.seek(SeekFrom::Start(args.range().offset()))
481 .map_err(new_std_io_error)?;
482 }
483
484 let r = FsReader::new(
485 self.core.clone(),
486 f,
487 args.range().size().unwrap_or(u64::MAX) as _,
488 );
489 Ok((RpRead::new(), r))
490 }
491
492 fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
493 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
494 let target_path = self
495 .core
496 .blocking_ensure_write_abs_path(&self.core.root, path)?;
497 let tmp_path = self
498 .core
499 .blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
500
501 if op.append()
503 && Path::new(&target_path)
504 .try_exists()
505 .map_err(new_std_io_error)?
506 {
507 (target_path, None)
508 } else {
509 (target_path, Some(tmp_path))
510 }
511 } else {
512 let p = self
513 .core
514 .blocking_ensure_write_abs_path(&self.core.root, path)?;
515
516 (p, None)
517 };
518
519 let mut f = std::fs::OpenOptions::new();
520
521 if op.if_not_exists() {
522 f.create_new(true);
523 } else {
524 f.create(true);
525 }
526
527 f.write(true);
528
529 if op.append() {
530 f.append(true);
531 } else {
532 f.truncate(true);
533 }
534
535 let f = f
536 .open(tmp_path.as_ref().unwrap_or(&target_path))
537 .map_err(|e| {
538 match e.kind() {
539 std::io::ErrorKind::AlreadyExists => {
540 Error::new(
542 ErrorKind::ConditionNotMatch,
543 "The file already exists in the filesystem",
544 )
545 .set_source(e)
546 }
547 _ => new_std_io_error(e),
548 }
549 })?;
550
551 Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
552 }
553
554 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
555 Ok((
556 RpDelete::default(),
557 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
558 ))
559 }
560
561 fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
562 let p = self.core.root.join(path.trim_end_matches('/'));
563
564 let f = match std::fs::read_dir(p) {
565 Ok(rd) => rd,
566 Err(e) => {
567 return match e.kind() {
568 std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
570 _ => {
580 #[cfg(unix)]
582 if e.raw_os_error() == Some(20) {
583 return Ok((RpList::default(), None));
585 }
586 #[cfg(windows)]
587 if e.raw_os_error() == Some(267) {
588 return Ok((RpList::default(), None));
590 }
591 Err(new_std_io_error(e))
592 }
593 };
594 }
595 };
596
597 let rd = FsLister::new(&self.core.root, path, f);
598 Ok((RpList::default(), Some(rd)))
599 }
600
601 fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
602 let from = self.core.root.join(from.trim_end_matches('/'));
603
604 std::fs::metadata(&from).map_err(new_std_io_error)?;
606
607 let to = self
608 .core
609 .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
610
611 std::fs::copy(from, to).map_err(new_std_io_error)?;
612
613 Ok(RpCopy::default())
614 }
615
616 fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
617 let from = self.core.root.join(from.trim_end_matches('/'));
618
619 std::fs::metadata(&from).map_err(new_std_io_error)?;
621
622 let to = self
623 .core
624 .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
625
626 std::fs::rename(from, to).map_err(new_std_io_error)?;
627
628 Ok(RpRename::default())
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635
636 #[test]
637 fn test_tmp_file_of() {
638 let cases = vec![
639 ("hello.txt", "hello.txt"),
640 ("/tmp/opendal.log", "opendal.log"),
641 ("/abc/def/hello.parquet", "hello.parquet"),
642 ];
643
644 for (path, expected_prefix) in cases {
645 let tmp_file = tmp_file_of(path);
646 assert!(tmp_file.len() > expected_prefix.len());
647 assert!(tmp_file.starts_with(expected_prefix));
648 }
649 }
650}