opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use log::debug;
22
23use super::core::*;
24use super::delete::FsDeleter;
25use super::lister::FsLister;
26use super::reader::FsReader;
27use super::writer::FsWriter;
28use super::writer::FsWriters;
29use super::DEFAULT_SCHEME;
30use crate::raw::*;
31use crate::services::FsConfig;
32use crate::*;
33impl Configurator for FsConfig {
34    type Builder = FsBuilder;
35    fn into_builder(self) -> Self::Builder {
36        FsBuilder { config: self }
37    }
38}
39
40/// POSIX file system support.
41#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct FsBuilder {
44    config: FsConfig,
45}
46
47impl FsBuilder {
48    /// Set root for backend.
49    pub fn root(mut self, root: &str) -> Self {
50        self.config.root = if root.is_empty() {
51            None
52        } else {
53            Some(root.to_string())
54        };
55
56        self
57    }
58
59    /// Set temp dir for atomic write.
60    ///
61    /// # Notes
62    ///
63    /// - When append is enabled, we will not use atomic write
64    ///   to avoid data loss and performance issue.
65    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
66        if !dir.is_empty() {
67            self.config.atomic_write_dir = Some(dir.to_string());
68        }
69
70        self
71    }
72}
73
74impl Builder for FsBuilder {
75    type Config = FsConfig;
76
77    fn build(self) -> Result<impl Access> {
78        debug!("backend build started: {:?}", &self);
79
80        let root = match self.config.root.map(PathBuf::from) {
81            Some(root) => Ok(root),
82            None => Err(Error::new(
83                ErrorKind::ConfigInvalid,
84                "root is not specified",
85            )),
86        }?;
87        debug!("backend use root {}", root.to_string_lossy());
88
89        // If root dir is not exist, we must create it.
90        if let Err(e) = std::fs::metadata(&root) {
91            if e.kind() == std::io::ErrorKind::NotFound {
92                std::fs::create_dir_all(&root).map_err(|e| {
93                    Error::new(ErrorKind::Unexpected, "create root dir failed")
94                        .with_operation("Builder::build")
95                        .with_context("root", root.to_string_lossy())
96                        .set_source(e)
97                })?;
98            }
99        }
100
101        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
102
103        // If atomic write dir is not exist, we must create it.
104        if let Some(d) = &atomic_write_dir {
105            if let Err(e) = std::fs::metadata(d) {
106                if e.kind() == std::io::ErrorKind::NotFound {
107                    std::fs::create_dir_all(d).map_err(|e| {
108                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
109                            .with_operation("Builder::build")
110                            .with_context("atomic_write_dir", d.to_string_lossy())
111                            .set_source(e)
112                    })?;
113                }
114            }
115        }
116
117        // Canonicalize the root directory. This should work since we already know that we can
118        // get the metadata of the path.
119        let root = root.canonicalize().map_err(|e| {
120            Error::new(
121                ErrorKind::Unexpected,
122                "canonicalize of root directory failed",
123            )
124            .set_source(e)
125        })?;
126
127        // Canonicalize the atomic_write_dir directory. This should work since we already know that
128        // we can get the metadata of the path.
129        let atomic_write_dir = atomic_write_dir
130            .map(|p| {
131                p.canonicalize().map(Some).map_err(|e| {
132                    Error::new(
133                        ErrorKind::Unexpected,
134                        "canonicalize of atomic_write_dir directory failed",
135                    )
136                    .with_operation("Builder::build")
137                    .with_context("root", root.to_string_lossy())
138                    .set_source(e)
139                })
140            })
141            .unwrap_or(Ok(None))?;
142
143        Ok(FsBackend {
144            core: Arc::new(FsCore {
145                info: {
146                    let am = AccessorInfo::default();
147                    am.set_scheme(DEFAULT_SCHEME)
148                        .set_root(&root.to_string_lossy())
149                        .set_native_capability(Capability {
150                            stat: true,
151
152                            read: true,
153
154                            write: true,
155                            write_can_empty: true,
156                            write_can_append: true,
157                            write_can_multi: true,
158                            write_with_if_not_exists: true,
159
160                            create_dir: true,
161                            delete: true,
162
163                            list: true,
164
165                            copy: true,
166                            rename: true,
167
168                            shared: true,
169
170                            ..Default::default()
171                        });
172
173                    am.into()
174                },
175                root,
176                atomic_write_dir,
177                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
178            }),
179        })
180    }
181}
182
183/// Backend is used to serve `Accessor` support for posix-like fs.
184#[derive(Debug, Clone)]
185pub struct FsBackend {
186    core: Arc<FsCore>,
187}
188
189impl Access for FsBackend {
190    type Reader = FsReader<tokio::fs::File>;
191    type Writer = FsWriters;
192    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
193    type Deleter = oio::OneShotDeleter<FsDeleter>;
194
195    fn info(&self) -> Arc<AccessorInfo> {
196        self.core.info.clone()
197    }
198
199    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
200        self.core.fs_create_dir(path).await?;
201        Ok(RpCreateDir::default())
202    }
203
204    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
205        let m = self.core.fs_stat(path).await?;
206        Ok(RpStat::new(m))
207    }
208
209    /// # Notes
210    ///
211    /// There are three ways to get the total file length:
212    ///
213    /// - call std::fs::metadata directly and then open. (400ns)
214    /// - open file first, and then use `f.metadata()` (300ns)
215    /// - open file first, and then use `seek`. (100ns)
216    ///
217    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
218    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
219        let f = self.core.fs_read(path, &args).await?;
220        let r = FsReader::new(
221            self.core.clone(),
222            f,
223            args.range().size().unwrap_or(u64::MAX) as _,
224        );
225        Ok((RpRead::new(), r))
226    }
227
228    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
229        let is_append = op.append();
230        let concurrent = op.concurrent();
231
232        let writer = FsWriter::create(self.core.clone(), path, op).await?;
233
234        let writer = if is_append {
235            FsWriters::One(writer)
236        } else {
237            FsWriters::Two(oio::PositionWriter::new(
238                self.info().clone(),
239                writer,
240                concurrent,
241            ))
242        };
243
244        Ok((RpWrite::default(), writer))
245    }
246
247    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
248        Ok((
249            RpDelete::default(),
250            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
251        ))
252    }
253
254    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
255        match self.core.fs_list(path).await? {
256            Some(f) => {
257                let rd = FsLister::new(&self.core.root, path, f);
258                Ok((RpList::default(), Some(rd)))
259            }
260            None => Ok((RpList::default(), None)),
261        }
262    }
263
264    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
265        self.core.fs_copy(from, to).await?;
266        Ok(RpCopy::default())
267    }
268
269    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
270        self.core.fs_rename(from, to).await?;
271        Ok(RpRename::default())
272    }
273}