opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::SeekFrom;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use chrono::DateTime;
23use log::debug;
24
25use super::core::*;
26use super::delete::FsDeleter;
27use super::lister::FsLister;
28use super::reader::FsReader;
29use super::writer::FsWriter;
30use super::writer::FsWriters;
31use crate::raw::*;
32use crate::services::FsConfig;
33use crate::*;
34
35impl Configurator for FsConfig {
36    type Builder = FsBuilder;
37    fn into_builder(self) -> Self::Builder {
38        FsBuilder { config: self }
39    }
40}
41
42/// POSIX file system support.
43#[doc = include_str!("docs.md")]
44#[derive(Default, Debug)]
45pub struct FsBuilder {
46    config: FsConfig,
47}
48
49impl FsBuilder {
50    /// Set root for backend.
51    pub fn root(mut self, root: &str) -> Self {
52        self.config.root = if root.is_empty() {
53            None
54        } else {
55            Some(root.to_string())
56        };
57
58        self
59    }
60
61    /// Set temp dir for atomic write.
62    ///
63    /// # Notes
64    ///
65    /// - When append is enabled, we will not use atomic write
66    ///   to avoid data loss and performance issue.
67    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
68        if !dir.is_empty() {
69            self.config.atomic_write_dir = Some(dir.to_string());
70        }
71
72        self
73    }
74}
75
76impl Builder for FsBuilder {
77    const SCHEME: Scheme = Scheme::Fs;
78    type Config = FsConfig;
79
80    fn build(self) -> Result<impl Access> {
81        debug!("backend build started: {:?}", &self);
82
83        let root = match self.config.root.map(PathBuf::from) {
84            Some(root) => Ok(root),
85            None => Err(Error::new(
86                ErrorKind::ConfigInvalid,
87                "root is not specified",
88            )),
89        }?;
90        debug!("backend use root {}", root.to_string_lossy());
91
92        // If root dir is not exist, we must create it.
93        if let Err(e) = std::fs::metadata(&root) {
94            if e.kind() == std::io::ErrorKind::NotFound {
95                std::fs::create_dir_all(&root).map_err(|e| {
96                    Error::new(ErrorKind::Unexpected, "create root dir failed")
97                        .with_operation("Builder::build")
98                        .with_context("root", root.to_string_lossy())
99                        .set_source(e)
100                })?;
101            }
102        }
103
104        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
105
106        // If atomic write dir is not exist, we must create it.
107        if let Some(d) = &atomic_write_dir {
108            if let Err(e) = std::fs::metadata(d) {
109                if e.kind() == std::io::ErrorKind::NotFound {
110                    std::fs::create_dir_all(d).map_err(|e| {
111                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
112                            .with_operation("Builder::build")
113                            .with_context("atomic_write_dir", d.to_string_lossy())
114                            .set_source(e)
115                    })?;
116                }
117            }
118        }
119
120        // Canonicalize the root directory. This should work since we already know that we can
121        // get the metadata of the path.
122        let root = root.canonicalize().map_err(|e| {
123            Error::new(
124                ErrorKind::Unexpected,
125                "canonicalize of root directory failed",
126            )
127            .with_operation("Builder::build")
128            .with_context("root", root.to_string_lossy())
129            .set_source(e)
130        })?;
131
132        // Canonicalize the atomic_write_dir directory. This should work since we already know that
133        // we can get the metadata of the path.
134        let atomic_write_dir = atomic_write_dir
135            .map(|p| {
136                p.canonicalize().map(Some).map_err(|e| {
137                    Error::new(
138                        ErrorKind::Unexpected,
139                        "canonicalize of atomic_write_dir directory failed",
140                    )
141                    .with_operation("Builder::build")
142                    .with_context("root", root.to_string_lossy())
143                    .set_source(e)
144                })
145            })
146            .unwrap_or(Ok(None))?;
147
148        Ok(FsBackend {
149            core: Arc::new(FsCore {
150                info: {
151                    let am = AccessorInfo::default();
152                    am.set_scheme(Scheme::Fs)
153                        .set_root(&root.to_string_lossy())
154                        .set_native_capability(Capability {
155                            stat: true,
156                            stat_has_content_length: true,
157                            stat_has_last_modified: true,
158
159                            read: true,
160
161                            write: true,
162                            write_can_empty: true,
163                            write_can_append: true,
164                            write_can_multi: true,
165                            write_with_if_not_exists: true,
166
167                            create_dir: true,
168                            delete: true,
169
170                            list: true,
171
172                            copy: true,
173                            rename: true,
174
175                            shared: true,
176
177                            ..Default::default()
178                        });
179
180                    am.into()
181                },
182                root,
183                atomic_write_dir,
184                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
185            }),
186        })
187    }
188}
189
190/// Backend is used to serve `Accessor` support for posix-like fs.
191#[derive(Debug, Clone)]
192pub struct FsBackend {
193    core: Arc<FsCore>,
194}
195
196impl Access for FsBackend {
197    type Reader = FsReader<tokio::fs::File>;
198    type Writer = FsWriters;
199    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
200    type Deleter = oio::OneShotDeleter<FsDeleter>;
201
202    fn info(&self) -> Arc<AccessorInfo> {
203        self.core.info.clone()
204    }
205
206    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
207        let p = self.core.root.join(path.trim_end_matches('/'));
208
209        tokio::fs::create_dir_all(&p)
210            .await
211            .map_err(new_std_io_error)?;
212
213        Ok(RpCreateDir::default())
214    }
215
216    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
217        let p = self.core.root.join(path.trim_end_matches('/'));
218
219        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
220
221        let mode = if meta.is_dir() {
222            EntryMode::DIR
223        } else if meta.is_file() {
224            EntryMode::FILE
225        } else {
226            EntryMode::Unknown
227        };
228        let m = Metadata::new(mode)
229            .with_content_length(meta.len())
230            .with_last_modified(
231                meta.modified()
232                    .map(DateTime::from)
233                    .map_err(new_std_io_error)?,
234            );
235
236        Ok(RpStat::new(m))
237    }
238
239    /// # Notes
240    ///
241    /// There are three ways to get the total file length:
242    ///
243    /// - call std::fs::metadata directly and then open. (400ns)
244    /// - open file first, and then use `f.metadata()` (300ns)
245    /// - open file first, and then use `seek`. (100ns)
246    ///
247    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
248    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
249        let p = self.core.root.join(path.trim_end_matches('/'));
250
251        let mut f = tokio::fs::OpenOptions::new()
252            .read(true)
253            .open(&p)
254            .await
255            .map_err(new_std_io_error)?;
256
257        if args.range().offset() != 0 {
258            use tokio::io::AsyncSeekExt;
259
260            f.seek(SeekFrom::Start(args.range().offset()))
261                .await
262                .map_err(new_std_io_error)?;
263        }
264
265        let r = FsReader::new(
266            self.core.clone(),
267            f,
268            args.range().size().unwrap_or(u64::MAX) as _,
269        );
270        Ok((RpRead::new(), r))
271    }
272
273    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
274        let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir {
275            let target_path = self
276                .core
277                .ensure_write_abs_path(&self.core.root, path)
278                .await?;
279            let tmp_path = self
280                .core
281                .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
282                .await?;
283
284            // If the target file exists, we should append to the end of it directly.
285            if op.append()
286                && tokio::fs::try_exists(&target_path)
287                    .await
288                    .map_err(new_std_io_error)?
289            {
290                (target_path, None)
291            } else {
292                (target_path, Some(tmp_path))
293            }
294        } else {
295            let p = self
296                .core
297                .ensure_write_abs_path(&self.core.root, path)
298                .await?;
299
300            (p, None)
301        };
302
303        let mut open_options = tokio::fs::OpenOptions::new();
304        if op.if_not_exists() {
305            open_options.create_new(true);
306        } else {
307            open_options.create(true);
308        }
309
310        open_options.write(true);
311
312        if op.append() {
313            open_options.append(true);
314        } else {
315            open_options.truncate(true);
316        }
317
318        let f = open_options
319            .open(tmp_path.as_ref().unwrap_or(&target_path))
320            .await
321            .map_err(|e| {
322                match e.kind() {
323                    std::io::ErrorKind::AlreadyExists => {
324                        // Map io AlreadyExists to opendal ConditionNotMatch
325                        Error::new(
326                            ErrorKind::ConditionNotMatch,
327                            "The file already exists in the filesystem",
328                        )
329                        .set_source(e)
330                    }
331                    _ => new_std_io_error(e),
332                }
333            })?;
334
335        let w = FsWriter::new(target_path, tmp_path, f);
336
337        let w = if op.append() {
338            FsWriters::One(w)
339        } else {
340            FsWriters::Two(oio::PositionWriter::new(
341                self.info().clone(),
342                w,
343                op.concurrent(),
344            ))
345        };
346
347        Ok((RpWrite::default(), w))
348    }
349
350    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
351        Ok((
352            RpDelete::default(),
353            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
354        ))
355    }
356
357    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
358        let p = self.core.root.join(path.trim_end_matches('/'));
359
360        let f = match tokio::fs::read_dir(&p).await {
361            Ok(rd) => rd,
362            Err(e) => {
363                return match e.kind() {
364                    // Return empty list if the directory not found
365                    std::io::ErrorKind::NotFound => Ok((RpList::default(), None)),
366                    // TODO: enable after our MSRV has been raised to 1.83
367                    //
368                    // If the path is not a directory, return an empty list
369                    //
370                    // The path could be a file or a symbolic link in this case.
371                    // Returning a NotADirectory error to the user isn't helpful; instead,
372                    // providing an empty directory is a more user-friendly. In fact, the dir
373                    // `path/` does not exist.
374                    // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
375                    _ => {
376                        // TODO: remove this after we have MSRV 1.83
377                        #[cfg(unix)]
378                        if e.raw_os_error() == Some(20) {
379                            // On unix 20: Not a directory
380                            return Ok((RpList::default(), None));
381                        }
382                        #[cfg(windows)]
383                        if e.raw_os_error() == Some(267) {
384                            // On windows 267: DIRECTORY
385                            return Ok((RpList::default(), None));
386                        }
387
388                        Err(new_std_io_error(e))
389                    }
390                };
391            }
392        };
393
394        let rd = FsLister::new(&self.core.root, path, f);
395        Ok((RpList::default(), Some(rd)))
396    }
397
398    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
399        let from = self.core.root.join(from.trim_end_matches('/'));
400
401        // try to get the metadata of the source file to ensure it exists
402        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
403
404        let to = self
405            .core
406            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
407            .await?;
408
409        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
410
411        Ok(RpCopy::default())
412    }
413
414    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
415        let from = self.core.root.join(from.trim_end_matches('/'));
416
417        // try to get the metadata of the source file to ensure it exists
418        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
419
420        let to = self
421            .core
422            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
423            .await?;
424
425        tokio::fs::rename(from, to)
426            .await
427            .map_err(new_std_io_error)?;
428
429        Ok(RpRename::default())
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn test_tmp_file_of() {
439        let cases = vec![
440            ("hello.txt", "hello.txt"),
441            ("/tmp/opendal.log", "opendal.log"),
442            ("/abc/def/hello.parquet", "hello.parquet"),
443        ];
444
445        for (path, expected_prefix) in cases {
446            let tmp_file = tmp_file_of(path);
447            assert!(tmp_file.len() > expected_prefix.len());
448            assert!(tmp_file.starts_with(expected_prefix));
449        }
450    }
451}