opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24use log::debug;
25
26use super::core::*;
27use super::delete::FsDeleter;
28use super::lister::FsLister;
29use super::reader::FsReader;
30use super::writer::FsWriter;
31use super::writer::FsWriters;
32use crate::raw::*;
33use crate::services::FsConfig;
34use crate::*;
35
36impl Configurator for FsConfig {
37    type Builder = FsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        FsBuilder { config: self }
40    }
41}
42
43/// POSIX file system support.
44#[doc = include_str!("docs.md")]
45#[derive(Default, Debug)]
46pub struct FsBuilder {
47    config: FsConfig,
48}
49
50impl FsBuilder {
51    /// Set root for backend.
52    pub fn root(mut self, root: &str) -> Self {
53        self.config.root = if root.is_empty() {
54            None
55        } else {
56            Some(root.to_string())
57        };
58
59        self
60    }
61
62    /// Set temp dir for atomic write.
63    ///
64    /// # Notes
65    ///
66    /// - When append is enabled, we will not use atomic write
67    ///   to avoid data loss and performance issue.
68    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
69        if !dir.is_empty() {
70            self.config.atomic_write_dir = Some(dir.to_string());
71        }
72
73        self
74    }
75}
76
77impl Builder for FsBuilder {
78    const SCHEME: Scheme = Scheme::Fs;
79    type Config = FsConfig;
80
81    fn build(self) -> Result<impl Access> {
82        debug!("backend build started: {:?}", &self);
83
84        let root = match self.config.root.map(PathBuf::from) {
85            Some(root) => Ok(root),
86            None => Err(Error::new(
87                ErrorKind::ConfigInvalid,
88                "root is not specified",
89            )),
90        }?;
91        debug!("backend use root {}", root.to_string_lossy());
92
93        // If root dir is not exist, we must create it.
94        if let Err(e) = std::fs::metadata(&root) {
95            if e.kind() == std::io::ErrorKind::NotFound {
96                std::fs::create_dir_all(&root).map_err(|e| {
97                    Error::new(ErrorKind::Unexpected, "create root dir failed")
98                        .with_operation("Builder::build")
99                        .with_context("root", root.to_string_lossy())
100                        .set_source(e)
101                })?;
102            }
103        }
104
105        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
106
107        // If atomic write dir is not exist, we must create it.
108        if let Some(d) = &atomic_write_dir {
109            if let Err(e) = std::fs::metadata(d) {
110                if e.kind() == std::io::ErrorKind::NotFound {
111                    std::fs::create_dir_all(d).map_err(|e| {
112                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
113                            .with_operation("Builder::build")
114                            .with_context("atomic_write_dir", d.to_string_lossy())
115                            .set_source(e)
116                    })?;
117                }
118            }
119        }
120
121        // Canonicalize the root directory. This should work since we already know that we can
122        // get the metadata of the path.
123        let root = root.canonicalize().map_err(|e| {
124            Error::new(
125                ErrorKind::Unexpected,
126                "canonicalize of root directory failed",
127            )
128            .with_operation("Builder::build")
129            .with_context("root", root.to_string_lossy())
130            .set_source(e)
131        })?;
132
133        // Canonicalize the atomic_write_dir directory. This should work since we already know that
134        // we can get the metadata of the path.
135        let atomic_write_dir = atomic_write_dir
136            .map(|p| {
137                p.canonicalize().map(Some).map_err(|e| {
138                    Error::new(
139                        ErrorKind::Unexpected,
140                        "canonicalize of atomic_write_dir directory failed",
141                    )
142                    .with_operation("Builder::build")
143                    .with_context("root", root.to_string_lossy())
144                    .set_source(e)
145                })
146            })
147            .unwrap_or(Ok(None))?;
148
149        Ok(FsBackend {
150            core: Arc::new(FsCore {
151                info: {
152                    let am = AccessorInfo::default();
153                    am.set_scheme(Scheme::Fs)
154                        .set_root(&root.to_string_lossy())
155                        .set_native_capability(Capability {
156                            stat: true,
157                            stat_has_content_length: true,
158                            stat_has_last_modified: true,
159
160                            read: true,
161
162                            write: true,
163                            write_can_empty: true,
164                            write_can_append: true,
165                            write_can_multi: true,
166                            write_with_if_not_exists: true,
167
168                            create_dir: true,
169                            delete: true,
170
171                            list: true,
172
173                            copy: true,
174                            rename: true,
175                            blocking: true,
176
177                            shared: true,
178
179                            ..Default::default()
180                        });
181
182                    am.into()
183                },
184                root,
185                atomic_write_dir,
186                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
187            }),
188        })
189    }
190}
191
192/// Backend is used to serve `Accessor` support for posix-like fs.
193#[derive(Debug, Clone)]
194pub struct FsBackend {
195    core: Arc<FsCore>,
196}
197
198impl Access for FsBackend {
199    type Reader = FsReader<tokio::fs::File>;
200    type Writer = FsWriters;
201    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
202    type Deleter = oio::OneShotDeleter<FsDeleter>;
203    type BlockingReader = FsReader<std::fs::File>;
204    type BlockingWriter = FsWriter<std::fs::File>;
205    type BlockingLister = Option<FsLister<std::fs::ReadDir>>;
206    type BlockingDeleter = oio::OneShotDeleter<FsDeleter>;
207
208    fn info(&self) -> Arc<AccessorInfo> {
209        self.core.info.clone()
210    }
211
212    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
213        let p = self.core.root.join(path.trim_end_matches('/'));
214
215        tokio::fs::create_dir_all(&p)
216            .await
217            .map_err(new_std_io_error)?;
218
219        Ok(RpCreateDir::default())
220    }
221
222    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
223        let p = self.core.root.join(path.trim_end_matches('/'));
224
225        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
226
227        let mode = if meta.is_dir() {
228            EntryMode::DIR
229        } else if meta.is_file() {
230            EntryMode::FILE
231        } else {
232            EntryMode::Unknown
233        };
234        let m = Metadata::new(mode)
235            .with_content_length(meta.len())
236            .with_last_modified(
237                meta.modified()
238                    .map(DateTime::from)
239                    .map_err(new_std_io_error)?,
240            );
241
242        Ok(RpStat::new(m))
243    }
244
245    /// # Notes
246    ///
247    /// There are three ways to get the total file length:
248    ///
249    /// - call std::fs::metadata directly and then open. (400ns)
250    /// - open file first, and then use `f.metadata()` (300ns)
251    /// - open file first, and then use `seek`. (100ns)
252    ///
253    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
254    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
255        let p = self.core.root.join(path.trim_end_matches('/'));
256
257        let mut f = tokio::fs::OpenOptions::new()
258            .read(true)
259            .open(&p)
260            .await
261            .map_err(new_std_io_error)?;
262
263        if args.range().offset() != 0 {
264            use tokio::io::AsyncSeekExt;
265
266            f.seek(SeekFrom::Start(args.range().offset()))
267                .await
268                .map_err(new_std_io_error)?;
269        }
270
271        let r = FsReader::new(
272            self.core.clone(),
273            f,
274            args.range().size().unwrap_or(u64::MAX) as _,
275        );
276        Ok((RpRead::new(), r))
277    }
278
279    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
280        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
281            let target_path = self
282                .core
283                .ensure_write_abs_path(&self.core.root, path)
284                .await?;
285            let tmp_path = self
286                .core
287                .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
288                .await?;
289
290            // If the target file exists, we should append to the end of it directly.
291            if op.append()
292                && tokio::fs::try_exists(&target_path)
293                    .await
294                    .map_err(new_std_io_error)?
295            {
296                (target_path, None)
297            } else {
298                (target_path, Some(tmp_path))
299            }
300        } else {
301            let p = self
302                .core
303                .ensure_write_abs_path(&self.core.root, path)
304                .await?;
305
306            (p, None)
307        };
308
309        let mut open_options = tokio::fs::OpenOptions::new();
310        if op.if_not_exists() {
311            open_options.create_new(true);
312        } else {
313            open_options.create(true);
314        }
315
316        open_options.write(true);
317
318        if op.append() {
319            open_options.append(true);
320        } else {
321            open_options.truncate(true);
322        }
323
324        let f = open_options
325            .open(tmp_path.as_ref().unwrap_or(&target_path))
326            .await
327            .map_err(|e| {
328                match e.kind() {
329                    std::io::ErrorKind::AlreadyExists => {
330                        // Map io AlreadyExists to opendal ConditionNotMatch
331                        Error::new(
332                            ErrorKind::ConditionNotMatch,
333                            "The file already exists in the filesystem",
334                        )
335                        .set_source(e)
336                    }
337                    _ => new_std_io_error(e),
338                }
339            })?;
340
341        let w = FsWriter::new(target_path, tmp_path, f);
342
343        let w = if op.append() {
344            FsWriters::One(w)
345        } else {
346            FsWriters::Two(oio::PositionWriter::new(
347                self.info().clone(),
348                w,
349                op.concurrent(),
350            ))
351        };
352
353        Ok((RpWrite::default(), w))
354    }
355
356    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357        Ok((
358            RpDelete::default(),
359            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
360        ))
361    }
362
363    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
364        let p = self.core.root.join(path.trim_end_matches('/'));
365
366        let f = match tokio::fs::read_dir(&p).await {
367            Ok(rd) => rd,
368            Err(e) => {
369                return if e.kind() == std::io::ErrorKind::NotFound {
370                    Ok((RpList::default(), None))
371                } else {
372                    Err(new_std_io_error(e))
373                };
374            }
375        };
376
377        let rd = FsLister::new(&self.core.root, path, f);
378        Ok((RpList::default(), Some(rd)))
379    }
380
381    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
382        let from = self.core.root.join(from.trim_end_matches('/'));
383
384        // try to get the metadata of the source file to ensure it exists
385        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
386
387        let to = self
388            .core
389            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
390            .await?;
391
392        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
393
394        Ok(RpCopy::default())
395    }
396
397    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
398        let from = self.core.root.join(from.trim_end_matches('/'));
399
400        // try to get the metadata of the source file to ensure it exists
401        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
402
403        let to = self
404            .core
405            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
406            .await?;
407
408        tokio::fs::rename(from, to)
409            .await
410            .map_err(new_std_io_error)?;
411
412        Ok(RpRename::default())
413    }
414
415    fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
416        let p = self.core.root.join(path.trim_end_matches('/'));
417
418        std::fs::create_dir_all(p).map_err(new_std_io_error)?;
419
420        Ok(RpCreateDir::default())
421    }
422
423    fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
424        let p = self.core.root.join(path.trim_end_matches('/'));
425
426        let meta = std::fs::metadata(p).map_err(new_std_io_error)?;
427
428        let mode = if meta.is_dir() {
429            EntryMode::DIR
430        } else if meta.is_file() {
431            EntryMode::FILE
432        } else {
433            EntryMode::Unknown
434        };
435        let m = Metadata::new(mode)
436            .with_content_length(meta.len())
437            .with_last_modified(
438                meta.modified()
439                    .map(DateTime::from)
440                    .map_err(new_std_io_error)?,
441            );
442
443        Ok(RpStat::new(m))
444    }
445
446    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
447        let p = self.core.root.join(path.trim_end_matches('/'));
448
449        let mut f = std::fs::OpenOptions::new()
450            .read(true)
451            .open(p)
452            .map_err(new_std_io_error)?;
453
454        if args.range().offset() != 0 {
455            use std::io::Seek;
456
457            f.seek(SeekFrom::Start(args.range().offset()))
458                .map_err(new_std_io_error)?;
459        }
460
461        let r = FsReader::new(
462            self.core.clone(),
463            f,
464            args.range().size().unwrap_or(u64::MAX) as _,
465        );
466        Ok((RpRead::new(), r))
467    }
468
469    fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
470        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
471            let target_path = self
472                .core
473                .blocking_ensure_write_abs_path(&self.core.root, path)?;
474            let tmp_path = self
475                .core
476                .blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
477
478            // If the target file exists, we should append to the end of it directly.
479            if op.append()
480                && Path::new(&target_path)
481                    .try_exists()
482                    .map_err(new_std_io_error)?
483            {
484                (target_path, None)
485            } else {
486                (target_path, Some(tmp_path))
487            }
488        } else {
489            let p = self
490                .core
491                .blocking_ensure_write_abs_path(&self.core.root, path)?;
492
493            (p, None)
494        };
495
496        let mut f = std::fs::OpenOptions::new();
497
498        if op.if_not_exists() {
499            f.create_new(true);
500        } else {
501            f.create(true);
502        }
503
504        f.write(true);
505
506        if op.append() {
507            f.append(true);
508        } else {
509            f.truncate(true);
510        }
511
512        let f = f
513            .open(tmp_path.as_ref().unwrap_or(&target_path))
514            .map_err(|e| {
515                match e.kind() {
516                    std::io::ErrorKind::AlreadyExists => {
517                        // Map io AlreadyExists to opendal ConditionNotMatch
518                        Error::new(
519                            ErrorKind::ConditionNotMatch,
520                            "The file already exists in the filesystem",
521                        )
522                        .set_source(e)
523                    }
524                    _ => new_std_io_error(e),
525                }
526            })?;
527
528        Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
529    }
530
531    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
532        Ok((
533            RpDelete::default(),
534            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
535        ))
536    }
537
538    fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> {
539        let p = self.core.root.join(path.trim_end_matches('/'));
540
541        let f = match std::fs::read_dir(p) {
542            Ok(rd) => rd,
543            Err(e) => {
544                return if e.kind() == std::io::ErrorKind::NotFound {
545                    Ok((RpList::default(), None))
546                } else {
547                    Err(new_std_io_error(e))
548                };
549            }
550        };
551
552        let rd = FsLister::new(&self.core.root, path, f);
553        Ok((RpList::default(), Some(rd)))
554    }
555
556    fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
557        let from = self.core.root.join(from.trim_end_matches('/'));
558
559        // try to get the metadata of the source file to ensure it exists
560        std::fs::metadata(&from).map_err(new_std_io_error)?;
561
562        let to = self
563            .core
564            .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
565
566        std::fs::copy(from, to).map_err(new_std_io_error)?;
567
568        Ok(RpCopy::default())
569    }
570
571    fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
572        let from = self.core.root.join(from.trim_end_matches('/'));
573
574        // try to get the metadata of the source file to ensure it exists
575        std::fs::metadata(&from).map_err(new_std_io_error)?;
576
577        let to = self
578            .core
579            .blocking_ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))?;
580
581        std::fs::rename(from, to).map_err(new_std_io_error)?;
582
583        Ok(RpRename::default())
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn test_tmp_file_of() {
593        let cases = vec![
594            ("hello.txt", "hello.txt"),
595            ("/tmp/opendal.log", "opendal.log"),
596            ("/abc/def/hello.parquet", "hello.parquet"),
597        ];
598
599        for (path, expected_prefix) in cases {
600            let tmp_file = tmp_file_of(path);
601            assert!(tmp_file.len() > expected_prefix.len());
602            assert!(tmp_file.starts_with(expected_prefix));
603        }
604    }
605}