opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use log::debug;
25
26use super::core::*;
27use super::delete::FsDeleter;
28use super::lister::FsLister;
29use super::reader::FsReader;
30use super::writer::FsWriter;
31use super::writer::FsWriters;
32use crate::raw::*;
33use crate::services::FsConfig;
34use crate::*;
35
36impl Configurator for FsConfig {
37    type Builder = FsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        FsBuilder { config: self }
40    }
41}
42
43/// POSIX file system support.
44#[doc = include_str!("docs.md")]
45#[derive(Default, Debug)]
46pub struct FsBuilder {
47    config: FsConfig,
48}
49
50impl FsBuilder {
51    /// Set root for backend.
52    pub fn root(mut self, root: &str) -> Self {
53        self.config.root = if root.is_empty() {
54            None
55        } else {
56            Some(root.to_string())
57        };
58
59        self
60    }
61
62    /// Set temp dir for atomic write.
63    ///
64    /// # Notes
65    ///
66    /// - When append is enabled, we will not use atomic write
67    ///   to avoid data loss and performance issue.
68    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
69        if !dir.is_empty() {
70            self.config.atomic_write_dir = Some(dir.to_string());
71        }
72
73        self
74    }
75}
76
77impl Builder for FsBuilder {
78    const SCHEME: Scheme = Scheme::Fs;
79    type Config = FsConfig;
80
81    fn build(self) -> Result<impl Access> {
82        debug!("backend build started: {:?}", &self);
83
84        let root = match self.config.root.map(PathBuf::from) {
85            Some(root) => Ok(root),
86            None => Err(Error::new(
87                ErrorKind::ConfigInvalid,
88                "root is not specified",
89            )),
90        }?;
91        debug!("backend use root {}", root.to_string_lossy());
92
93        // If root dir is not exist, we must create it.
94        if let Err(e) = std::fs::metadata(&root) {
95            if e.kind() == std::io::ErrorKind::NotFound {
96                std::fs::create_dir_all(&root).map_err(|e| {
97                    Error::new(ErrorKind::Unexpected, "create root dir failed")
98                        .with_operation("Builder::build")
99                        .with_context("root", root.to_string_lossy())
100                        .set_source(e)
101                })?;
102            }
103        }
104
105        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
106
107        // If atomic write dir is not exist, we must create it.
108        if let Some(d) = &atomic_write_dir {
109            if let Err(e) = std::fs::metadata(d) {
110                if e.kind() == std::io::ErrorKind::NotFound {
111                    std::fs::create_dir_all(d).map_err(|e| {
112                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
113                            .with_operation("Builder::build")
114                            .with_context("atomic_write_dir", d.to_string_lossy())
115                            .set_source(e)
116                    })?;
117                }
118            }
119        }
120
121        // Canonicalize the root directory. This should work since we already know that we can
122        // get the metadata of the path.
123        let root = root.canonicalize().map_err(|e| {
124            Error::new(
125                ErrorKind::Unexpected,
126                "canonicalize of root directory failed",
127            )
128            .with_operation("Builder::build")
129            .with_context("root", root.to_string_lossy())
130            .set_source(e)
131        })?;
132
133        // Canonicalize the atomic_write_dir directory. This should work since we already know that
134        // we can get the metadata of the path.
135        let atomic_write_dir = atomic_write_dir
136            .map(|p| {
137                p.canonicalize().map(Some).map_err(|e| {
138                    Error::new(
139                        ErrorKind::Unexpected,
140                        "canonicalize of atomic_write_dir directory failed",
141                    )
142                    .with_operation("Builder::build")
143                    .with_context("root", root.to_string_lossy())
144                    .set_source(e)
145                })
146            })
147            .unwrap_or(Ok(None))?;
148
149        Ok(FsBackend {
150            core: Arc::new(FsCore {
151                info: {
152                    let am = AccessorInfo::default();
153                    am.set_scheme(Scheme::Fs)
154                        .set_root(&root.to_string_lossy())
155                        .set_native_capability(Capability {
156                            stat: true,
157                            stat_has_content_length: true,
158                            stat_has_last_modified: true,
159
160                            read: true,
161
162                            write: true,
163                            write_can_empty: true,
164                            write_can_append: true,
165                            write_can_multi: true,
166                            write_with_if_not_exists: true,
167
168                            create_dir: true,
169                            delete: true,
170
171                            list: true,
172
173                            copy: true,
174                            rename: true,
175                            blocking: true,
176
177                            shared: true,
178
179                            ..Default::default()
180                        });
181
182                    am.into()
183                },
184                root,
185                atomic_write_dir,
186                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
187            }),
188        })
189    }
190}
191
192/// Backend is used to serve `Accessor` support for posix-like fs.
193#[derive(Debug, Clone)]
194pub struct FsBackend {
195    core: Arc<FsCore>,
196}
197
198impl Access for FsBackend {
199    type Reader = FsReader<tokio::fs::File>;
200    type Writer = FsWriters;
201    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
202    type Deleter = oio::OneShotDeleter<FsDeleter>;
203    type BlockingReader = FsReader<std::fs::File>;
204    type BlockingWriter = FsWriter<std::fs::File>;
205    type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
206    type BlockingDeleter = oio::OneShotDeleter<FsDeleter>;
207
208    fn info(&self) -> Arc<AccessorInfo> {
209        self.core.info.clone()
210    }
211
212    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
213        let p = self.core.root.join(path.trim_end_matches('/'));
214
215        tokio::fs::create_dir_all(&p)
216            .await
217            .map_err(new_std_io_error)?;
218
219        Ok(RpCreateDir::default())
220    }
221
222    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
223        let p = self.core.root.join(path.trim_end_matches('/'));
224
225        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
226
227        let mode = if meta.is_dir() {
228            EntryMode::DIR
229        } else if meta.is_file() {
230            EntryMode::FILE
231        } else {
232            EntryMode::Unknown
233        };
234        let m = Metadata::new(mode)
235            .with_content_length(meta.len())
236            .with_last_modified(
237                meta.modified()
238                    .map(DateTime::from)
239                    .map_err(new_std_io_error)?,
240            );
241
242        Ok(RpStat::new(m))
243    }
244
245    /// # Notes
246    ///
247    /// There are three ways to get the total file length:
248    ///
249    /// - call std::fs::metadata directly and then open. (400ns)
250    /// - open file first, and then use `f.metadata()` (300ns)
251    /// - open file first, and then use `seek`. (100ns)
252    ///
253    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
254    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255        let p = self.core.root.join(path.trim_end_matches('/'));
256
257        let mut f = tokio::fs::OpenOptions::new()
258            .read(true)
259            .open(&p)
260            .await
261            .map_err(new_std_io_error)?;
262
263        if args.range().offset() != 0 {
264            use tokio::io::AsyncSeekExt;
265
266            f.seek(SeekFrom::Start(args.range().offset()))
267                .await
268                .map_err(new_std_io_error)?;
269        }
270
271        let r = FsReader::new(
272            self.core.clone(),
273            f,
274            args.range().size().unwrap_or(u64::MAX) as _,
275        );
276        Ok((RpRead::new(), r))
277    }
278
279    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
280        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
281            let target_path = self
282                .core
283                .ensure_write_abs_path(&self.core.root, path)
284                .await?;
285            let tmp_path = self
286                .core
287                .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
288                .await?;
289
290            // If the target file exists, we should append to the end of it directly.
291            if op.append()
292                && tokio::fs::try_exists(&target_path)
293                    .await
294                    .map_err(new_std_io_error)?
295            {
296                (target_path, None)
297            } else {
298                (target_path, Some(tmp_path))
299            }
300        } else {
301            let p = self
302                .core
303                .ensure_write_abs_path(&self.core.root, path)
304                .await?;
305
306            (p, None)
307        };
308
309        let mut open_options = tokio::fs::OpenOptions::new();
310        if op.if_not_exists() {
311            open_options.create_new(true);
312        } else {
313            open_options.create(true);
314        }
315
316        open_options.write(true);
317
318        if op.append() {
319            open_options.append(true);
320        } else {
321            open_options.truncate(true);
322        }
323
324        let f = open_options
325            .open(tmp_path.as_ref().unwrap_or(&target_path))
326            .await
327            .map_err(|e| {
328                match e.kind() {
329                    std::io::ErrorKind::AlreadyExists => {
330                        // Map io AlreadyExists to opendal ConditionNotMatch
331                        Error::new(
332                            ErrorKind::ConditionNotMatch,
333                            "The file already exists in the filesystem",
334                        )
335                        .set_source(e)
336                    }
337                    _ => new_std_io_error(e),
338                }
339            })?;
340
341        let w = FsWriter::new(target_path, tmp_path, f);
342
343        let w = if op.append() {
344            FsWriters::One(w)
345        } else {
346            FsWriters::Two(oio::PositionWriter::new(
347                self.info().clone(),
348                w,
349                op.concurrent(),
350            ))
351        };
352
353        Ok((RpWrite::default(), w))
354    }
355
356    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357        Ok((
358            RpDelete::default(),
359            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
360        ))
361    }
362
363    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
364        let p = self.core.root.join(path.trim_end_matches('/'));
365
366        let f = match tokio::fs::read_dir(&p).await {
367            Ok(rd) => rd,
368            Err(e) => {
369                return match e.kind() {
370                    // Return empty list if the directory not found
371                    std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
372                    // TODO: enable after our MSRV has been raised to 1.83
373                    //
374                    // If the path is not a directory, return an empty list
375                    //
376                    // The path could be a file or a symbolic link in this case.
377                    // Returning a NotADirectory error to the user isn't helpful; instead,
378                    // providing an empty directory is a more user-friendly. In fact, the dir
379                    // `path/` does not exist.
380                    // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
381                    _ => {
382                        // TODO: remove this after we have MSRV 1.83
383                        #[cfg(unix)]
384                        if e.raw_os_error() == Some(20) {
385                            // On unix 20: Not a directory
386                            return Ok((RpList::default(), None));
387                        }
388                        #[cfg(windows)]
389                        if e.raw_os_error() == Some(267) {
390                            // On windows 267: DIRECTORY
391                            return Ok((RpList::default(), None));
392                        }
393
394                        Err(new_std_io_error(e))
395                    }
396                };
397            }
398        };
399
400        let rd = FsLister::new(&self.core.root, path, f);
401        Ok((RpList::default(), Some(rd)))
402    }
403
404    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
405        let from = self.core.root.join(from.trim_end_matches('/'));
406
407        // try to get the metadata of the source file to ensure it exists
408        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
409
410        let to = self
411            .core
412            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
413            .await?;
414
415        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
416
417        Ok(RpCopy::default())
418    }
419
420    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
421        let from = self.core.root.join(from.trim_end_matches('/'));
422
423        // try to get the metadata of the source file to ensure it exists
424        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
425
426        let to = self
427            .core
428            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
429            .await?;
430
431        tokio::fs::rename(from, to)
432            .await
433            .map_err(new_std_io_error)?;
434
435        Ok(RpRename::default())
436    }
437
438    fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
439        let p = self.core.root.join(path.trim_end_matches('/'));
440
441        std::fs::create_dir_all(p).map_err(new_std_io_error)?;
442
443        Ok(RpCreateDir::default())
444    }
445
446    fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
447        let p = self.core.root.join(path.trim_end_matches('/'));
448
449        let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
450
451        let mode = if meta.is_dir() {
452            EntryMode::DIR
453        } else if meta.is_file() {
454            EntryMode::FILE
455        } else {
456            EntryMode::Unknown
457        };
458        let m = Metadata::new(mode)
459            .with_content_length(meta.len())
460            .with_last_modified(
461                meta.modified()
462                    .map(DateTime::from)
463                    .map_err(new_std_io_error)?,
464            );
465
466        Ok(RpStat::new(m))
467    }
468
469    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
470        let p = self.core.root.join(path.trim_end_matches('/'));
471
472        let mut f = std::fs::OpenOptions::new()
473            .read(true)
474            .open(p)
475            .map_err(new_std_io_error)?;
476
477        if args.range().offset() != 0 {
478            use std::io::Seek;
479
480            f.seek(SeekFrom::Start(args.range().offset()))
481                .map_err(new_std_io_error)?;
482        }
483
484        let r = FsReader::new(
485            self.core.clone(),
486            f,
487            args.range().size().unwrap_or(u64::MAX) as _,
488        );
489        Ok((RpRead::new(), r))
490    }
491
492    fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
493        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
494            let target_path = self
495                .core
496                .blocking_ensure_write_abs_path(&self.core.root, path)?;
497            let tmp_path = self
498                .core
499                .blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
500
501            // If the target file exists, we should append to the end of it directly.
502            if op.append()
503                && Path::new(&target_path)
504                    .try_exists()
505                    .map_err(new_std_io_error)?
506            {
507                (target_path, None)
508            } else {
509                (target_path, Some(tmp_path))
510            }
511        } else {
512            let p = self
513                .core
514                .blocking_ensure_write_abs_path(&self.core.root, path)?;
515
516            (p, None)
517        };
518
519        let mut f = std::fs::OpenOptions::new();
520
521        if op.if_not_exists() {
522            f.create_new(true);
523        } else {
524            f.create(true);
525        }
526
527        f.write(true);
528
529        if op.append() {
530            f.append(true);
531        } else {
532            f.truncate(true);
533        }
534
535        let f = f
536            .open(tmp_path.as_ref().unwrap_or(&target_path))
537            .map_err(|e| {
538                match e.kind() {
539                    std::io::ErrorKind::AlreadyExists => {
540                        // Map io AlreadyExists to opendal ConditionNotMatch
541                        Error::new(
542                            ErrorKind::ConditionNotMatch,
543                            "The file already exists in the filesystem",
544                        )
545                        .set_source(e)
546                    }
547                    _ => new_std_io_error(e),
548                }
549            })?;
550
551        Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
552    }
553
554    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
555        Ok((
556            RpDelete::default(),
557            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
558        ))
559    }
560
561    fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
562        let p = self.core.root.join(path.trim_end_matches('/'));
563
564        let f = match std::fs::read_dir(p) {
565            Ok(rd) => rd,
566            Err(e) => {
567                return match e.kind() {
568                    // Return empty list if the directory not found
569                    std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
570                    // TODO: enable after our MSRV has been raised to 1.83
571                    //
572                    // If the path is not a directory, return an empty list
573                    //
574                    // The path could be a file or a symbolic link in this case.
575                    // Returning a NotADirectory error to the user isn't helpful; instead,
576                    // providing an empty directory is a more user-friendly. In fact, the dir
577                    // `path/` does not exist.
578                    // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
579                    _ => {
580                        // TODO: remove this after we have MSRV 1.83
581                        #[cfg(unix)]
582                        if e.raw_os_error() == Some(20) {
583                            // On unix 20: Not a directory
584                            return Ok((RpList::default(), None));
585                        }
586                        #[cfg(windows)]
587                        if e.raw_os_error() == Some(267) {
588                            // On windows 267: DIRECTORY
589                            return Ok((RpList::default(), None));
590                        }
591                        Err(new_std_io_error(e))
592                    }
593                };
594            }
595        };
596
597        let rd = FsLister::new(&self.core.root, path, f);
598        Ok((RpList::default(), Some(rd)))
599    }
600
601    fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
602        let from = self.core.root.join(from.trim_end_matches('/'));
603
604        // try to get the metadata of the source file to ensure it exists
605        std::fs::metadata(&from).map_err(new_std_io_error)?;
606
607        let to = self
608            .core
609            .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
610
611        std::fs::copy(from, to).map_err(new_std_io_error)?;
612
613        Ok(RpCopy::default())
614    }
615
616    fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
617        let from = self.core.root.join(from.trim_end_matches('/'));
618
619        // try to get the metadata of the source file to ensure it exists
620        std::fs::metadata(&from).map_err(new_std_io_error)?;
621
622        let to = self
623            .core
624            .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
625
626        std::fs::rename(from, to).map_err(new_std_io_error)?;
627
628        Ok(RpRename::default())
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635
636    #[test]
637    fn test_tmp_file_of() {
638        let cases = vec![
639            ("hello.txt", "hello.txt"),
640            ("/tmp/opendal.log", "opendal.log"),
641            ("/abc/def/hello.parquet", "hello.parquet"),
642        ];
643
644        for (path, expected_prefix) in cases {
645            let tmp_file = tmp_file_of(path);
646            assert!(tmp_file.len() > expected_prefix.len());
647            assert!(tmp_file.starts_with(expected_prefix));
648        }
649    }
650}