1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use log::debug;
25
26use super::core::*;
27use super::delete::FsDeleter;
28use super::lister::FsLister;
29use super::reader::FsReader;
30use super::writer::FsWriter;
31use super::writer::FsWriters;
32use crate::raw::*;
33use crate::services::FsConfig;
34use crate::*;
35
36impl Configurator for FsConfig {
37 type Builder = FsBuilder;
38 fn into_builder(self) -> Self::Builder {
39 FsBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
45#[derive(Default, Debug)]
46pub struct FsBuilder {
47 config: FsConfig,
48}
49
50impl FsBuilder {
51 pub fn root(mut self, root: &str) -> Self {
53 self.config.root = if root.is_empty() {
54 None
55 } else {
56 Some(root.to_string())
57 };
58
59 self
60 }
61
62 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
69 if !dir.is_empty() {
70 self.config.atomic_write_dir = Some(dir.to_string());
71 }
72
73 self
74 }
75}
76
77impl Builder for FsBuilder {
78 const SCHEME: Scheme = Scheme::Fs;
79 type Config = FsConfig;
80
81 fn build(self) -> Result<impl Access> {
82 debug!("backend build started: {:?}", &self);
83
84 let root = match self.config.root.map(PathBuf::from) {
85 Some(root) => Ok(root),
86 None => Err(Error::new(
87 ErrorKind::ConfigInvalid,
88 "root is not specified",
89 )),
90 }?;
91 debug!("backend use root {}", root.to_string_lossy());
92
93 if let Err(e) = std::fs::metadata(&root) {
95 if e.kind() == std::io::ErrorKind::NotFound {
96 std::fs::create_dir_all(&root).map_err(|e| {
97 Error::new(ErrorKind::Unexpected, "create root dir failed")
98 .with_operation("Builder::build")
99 .with_context("root", root.to_string_lossy())
100 .set_source(e)
101 })?;
102 }
103 }
104
105 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
106
107 if let Some(d) = &atomic_write_dir {
109 if let Err(e) = std::fs::metadata(d) {
110 if e.kind() == std::io::ErrorKind::NotFound {
111 std::fs::create_dir_all(d).map_err(|e| {
112 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
113 .with_operation("Builder::build")
114 .with_context("atomic_write_dir", d.to_string_lossy())
115 .set_source(e)
116 })?;
117 }
118 }
119 }
120
121 let root = root.canonicalize().map_err(|e| {
124 Error::new(
125 ErrorKind::Unexpected,
126 "canonicalize of root directory failed",
127 )
128 .with_operation("Builder::build")
129 .with_context("root", root.to_string_lossy())
130 .set_source(e)
131 })?;
132
133 let atomic_write_dir = atomic_write_dir
136 .map(|p| {
137 p.canonicalize().map(Some).map_err(|e| {
138 Error::new(
139 ErrorKind::Unexpected,
140 "canonicalize of atomic_write_dir directory failed",
141 )
142 .with_operation("Builder::build")
143 .with_context("root", root.to_string_lossy())
144 .set_source(e)
145 })
146 })
147 .unwrap_or(Ok(None))?;
148
149 Ok(FsBackend {
150 core: Arc::new(FsCore {
151 info: {
152 let am = AccessorInfo::default();
153 am.set_scheme(Scheme::Fs)
154 .set_root(&root.to_string_lossy())
155 .set_native_capability(Capability {
156 stat: true,
157 stat_has_content_length: true,
158 stat_has_last_modified: true,
159
160 read: true,
161
162 write: true,
163 write_can_empty: true,
164 write_can_append: true,
165 write_can_multi: true,
166 write_with_if_not_exists: true,
167
168 create_dir: true,
169 delete: true,
170
171 list: true,
172
173 copy: true,
174 rename: true,
175 blocking: true,
176
177 shared: true,
178
179 ..Default::default()
180 });
181
182 am.into()
183 },
184 root,
185 atomic_write_dir,
186 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
187 }),
188 })
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct FsBackend {
195 core: Arc<FsCore>,
196}
197
198impl Access for FsBackend {
199 type Reader = FsReader<tokio::fs::File>;
200 type Writer = FsWriters;
201 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
202 type Deleter = oio::OneShotDeleter<FsDeleter>;
203 type BlockingReader = FsReader<std::fs::File>;
204 type BlockingWriter = FsWriter<std::fs::File>;
205 type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
206 type BlockingDeleter = oio::OneShotDeleter<FsDeleter>;
207
208 fn info(&self) -> Arc<AccessorInfo> {
209 self.core.info.clone()
210 }
211
212 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
213 let p = self.core.root.join(path.trim_end_matches('/'));
214
215 tokio::fs::create_dir_all(&p)
216 .await
217 .map_err(new_std_io_error)?;
218
219 Ok(RpCreateDir::default())
220 }
221
222 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
223 let p = self.core.root.join(path.trim_end_matches('/'));
224
225 let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
226
227 let mode = if meta.is_dir() {
228 EntryMode::DIR
229 } else if meta.is_file() {
230 EntryMode::FILE
231 } else {
232 EntryMode::Unknown
233 };
234 let m = Metadata::new(mode)
235 .with_content_length(meta.len())
236 .with_last_modified(
237 meta.modified()
238 .map(DateTime::from)
239 .map_err(new_std_io_error)?,
240 );
241
242 Ok(RpStat::new(m))
243 }
244
245 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255 let p = self.core.root.join(path.trim_end_matches('/'));
256
257 let mut f = tokio::fs::OpenOptions::new()
258 .read(true)
259 .open(&p)
260 .await
261 .map_err(new_std_io_error)?;
262
263 if args.range().offset() != 0 {
264 use tokio::io::AsyncSeekExt;
265
266 f.seek(SeekFrom::Start(args.range().offset()))
267 .await
268 .map_err(new_std_io_error)?;
269 }
270
271 let r = FsReader::new(
272 self.core.clone(),
273 f,
274 args.range().size().unwrap_or(u64::MAX) as _,
275 );
276 Ok((RpRead::new(), r))
277 }
278
279 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
280 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
281 let target_path = self
282 .core
283 .ensure_write_abs_path(&self.core.root, path)
284 .await?;
285 let tmp_path = self
286 .core
287 .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
288 .await?;
289
290 if op.append()
292 && tokio::fs::try_exists(&target_path)
293 .await
294 .map_err(new_std_io_error)?
295 {
296 (target_path, None)
297 } else {
298 (target_path, Some(tmp_path))
299 }
300 } else {
301 let p = self
302 .core
303 .ensure_write_abs_path(&self.core.root, path)
304 .await?;
305
306 (p, None)
307 };
308
309 let mut open_options = tokio::fs::OpenOptions::new();
310 if op.if_not_exists() {
311 open_options.create_new(true);
312 } else {
313 open_options.create(true);
314 }
315
316 open_options.write(true);
317
318 if op.append() {
319 open_options.append(true);
320 } else {
321 open_options.truncate(true);
322 }
323
324 let f = open_options
325 .open(tmp_path.as_ref().unwrap_or(&target_path))
326 .await
327 .map_err(|e| {
328 match e.kind() {
329 std::io::ErrorKind::AlreadyExists => {
330 Error::new(
332 ErrorKind::ConditionNotMatch,
333 "The file already exists in the filesystem",
334 )
335 .set_source(e)
336 }
337 _ => new_std_io_error(e),
338 }
339 })?;
340
341 let w = FsWriter::new(target_path, tmp_path, f);
342
343 let w = if op.append() {
344 FsWriters::One(w)
345 } else {
346 FsWriters::Two(oio::PositionWriter::new(
347 self.info().clone(),
348 w,
349 op.concurrent(),
350 ))
351 };
352
353 Ok((RpWrite::default(), w))
354 }
355
356 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357 Ok((
358 RpDelete::default(),
359 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
360 ))
361 }
362
363 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
364 let p = self.core.root.join(path.trim_end_matches('/'));
365
366 let f = match tokio::fs::read_dir(&p).await {
367 Ok(rd) => rd,
368 Err(e) => {
369 return if e.kind() == std::io::ErrorKind::NotFound {
370 Ok((RpList::default(), None))
371 } else {
372 Err(new_std_io_error(e))
373 };
374 }
375 };
376
377 let rd = FsLister::new(&self.core.root, path, f);
378 Ok((RpList::default(), Some(rd)))
379 }
380
381 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
382 let from = self.core.root.join(from.trim_end_matches('/'));
383
384 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
386
387 let to = self
388 .core
389 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
390 .await?;
391
392 tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
393
394 Ok(RpCopy::default())
395 }
396
397 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
398 let from = self.core.root.join(from.trim_end_matches('/'));
399
400 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
402
403 let to = self
404 .core
405 .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
406 .await?;
407
408 tokio::fs::rename(from, to)
409 .await
410 .map_err(new_std_io_error)?;
411
412 Ok(RpRename::default())
413 }
414
415 fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
416 let p = self.core.root.join(path.trim_end_matches('/'));
417
418 std::fs::create_dir_all(p).map_err(new_std_io_error)?;
419
420 Ok(RpCreateDir::default())
421 }
422
423 fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
424 let p = self.core.root.join(path.trim_end_matches('/'));
425
426 let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
427
428 let mode = if meta.is_dir() {
429 EntryMode::DIR
430 } else if meta.is_file() {
431 EntryMode::FILE
432 } else {
433 EntryMode::Unknown
434 };
435 let m = Metadata::new(mode)
436 .with_content_length(meta.len())
437 .with_last_modified(
438 meta.modified()
439 .map(DateTime::from)
440 .map_err(new_std_io_error)?,
441 );
442
443 Ok(RpStat::new(m))
444 }
445
446 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
447 let p = self.core.root.join(path.trim_end_matches('/'));
448
449 let mut f = std::fs::OpenOptions::new()
450 .read(true)
451 .open(p)
452 .map_err(new_std_io_error)?;
453
454 if args.range().offset() != 0 {
455 use std::io::Seek;
456
457 f.seek(SeekFrom::Start(args.range().offset()))
458 .map_err(new_std_io_error)?;
459 }
460
461 let r = FsReader::new(
462 self.core.clone(),
463 f,
464 args.range().size().unwrap_or(u64::MAX) as _,
465 );
466 Ok((RpRead::new(), r))
467 }
468
469 fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
470 let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
471 let target_path = self
472 .core
473 .blocking_ensure_write_abs_path(&self.core.root, path)?;
474 let tmp_path = self
475 .core
476 .blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
477
478 if op.append()
480 && Path::new(&target_path)
481 .try_exists()
482 .map_err(new_std_io_error)?
483 {
484 (target_path, None)
485 } else {
486 (target_path, Some(tmp_path))
487 }
488 } else {
489 let p = self
490 .core
491 .blocking_ensure_write_abs_path(&self.core.root, path)?;
492
493 (p, None)
494 };
495
496 let mut f = std::fs::OpenOptions::new();
497
498 if op.if_not_exists() {
499 f.create_new(true);
500 } else {
501 f.create(true);
502 }
503
504 f.write(true);
505
506 if op.append() {
507 f.append(true);
508 } else {
509 f.truncate(true);
510 }
511
512 let f = f
513 .open(tmp_path.as_ref().unwrap_or(&target_path))
514 .map_err(|e| {
515 match e.kind() {
516 std::io::ErrorKind::AlreadyExists => {
517 Error::new(
519 ErrorKind::ConditionNotMatch,
520 "The file already exists in the filesystem",
521 )
522 .set_source(e)
523 }
524 _ => new_std_io_error(e),
525 }
526 })?;
527
528 Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
529 }
530
531 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
532 Ok((
533 RpDelete::default(),
534 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
535 ))
536 }
537
538 fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
539 let p = self.core.root.join(path.trim_end_matches('/'));
540
541 let f = match std::fs::read_dir(p) {
542 Ok(rd) => rd,
543 Err(e) => {
544 return if e.kind() == std::io::ErrorKind::NotFound {
545 Ok((RpList::default(), None))
546 } else {
547 Err(new_std_io_error(e))
548 };
549 }
550 };
551
552 let rd = FsLister::new(&self.core.root, path, f);
553 Ok((RpList::default(), Some(rd)))
554 }
555
556 fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
557 let from = self.core.root.join(from.trim_end_matches('/'));
558
559 std::fs::metadata(&from).map_err(new_std_io_error)?;
561
562 let to = self
563 .core
564 .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
565
566 std::fs::copy(from, to).map_err(new_std_io_error)?;
567
568 Ok(RpCopy::default())
569 }
570
571 fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
572 let from = self.core.root.join(from.trim_end_matches('/'));
573
574 std::fs::metadata(&from).map_err(new_std_io_error)?;
576
577 let to = self
578 .core
579 .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
580
581 std::fs::rename(from, to).map_err(new_std_io_error)?;
582
583 Ok(RpRename::default())
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn test_tmp_file_of() {
593 let cases = vec![
594 ("hello.txt", "hello.txt"),
595 ("/tmp/opendal.log", "opendal.log"),
596 ("/abc/def/hello.parquet", "hello.parquet"),
597 ];
598
599 for (path, expected_prefix) in cases {
600 let tmp_file = tmp_file_of(path);
601 assert!(tmp_file.len() > expected_prefix.len());
602 assert!(tmp_file.starts_with(expected_prefix));
603 }
604 }
605}