opendal/services/fs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22use log::debug;
23
24use http::Uri;
25use percent_encoding::percent_decode_str;
26
27use super::core::*;
28use super::delete::FsDeleter;
29use super::lister::FsLister;
30use super::reader::FsReader;
31use super::writer::FsWriter;
32use super::writer::FsWriters;
33use super::FS_SCHEME;
34use crate::raw::*;
35use crate::services::FsConfig;
36use crate::*;
37impl Configurator for FsConfig {
38    type Builder = FsBuilder;
39
40    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
41        let mut map = options.clone();
42
43        if !map.contains_key("root") {
44            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
45            if path.is_empty() || path == "/" {
46                return Err(Error::new(
47                    ErrorKind::ConfigInvalid,
48                    "fs uri requires absolute path",
49                ));
50            }
51            if !path.starts_with('/') {
52                return Err(Error::new(
53                    ErrorKind::ConfigInvalid,
54                    "fs uri root must be absolute",
55                ));
56            }
57            map.insert("root".to_string(), path.to_string());
58        }
59
60        Self::from_iter(map)
61    }
62
63    fn into_builder(self) -> Self::Builder {
64        FsBuilder { config: self }
65    }
66}
67
68/// POSIX file system support.
69#[doc = include_str!("docs.md")]
70#[derive(Default, Debug)]
71pub struct FsBuilder {
72    config: FsConfig,
73}
74
75impl FsBuilder {
76    /// Set root for backend.
77    pub fn root(mut self, root: &str) -> Self {
78        self.config.root = if root.is_empty() {
79            None
80        } else {
81            Some(root.to_string())
82        };
83
84        self
85    }
86
87    /// Set temp dir for atomic write.
88    ///
89    /// # Notes
90    ///
91    /// - When append is enabled, we will not use atomic write
92    ///   to avoid data loss and performance issue.
93    pub fn atomic_write_dir(mut self, dir: &str) -> Self {
94        if !dir.is_empty() {
95            self.config.atomic_write_dir = Some(dir.to_string());
96        }
97
98        self
99    }
100}
101
102impl Builder for FsBuilder {
103    type Config = FsConfig;
104
105    fn build(self) -> Result<impl Access> {
106        debug!("backend build started: {:?}", &self);
107
108        let root = match self.config.root.map(PathBuf::from) {
109            Some(root) => Ok(root),
110            None => Err(Error::new(
111                ErrorKind::ConfigInvalid,
112                "root is not specified",
113            )),
114        }?;
115        debug!("backend use root {}", root.to_string_lossy());
116
117        // If root dir is not exist, we must create it.
118        if let Err(e) = std::fs::metadata(&root) {
119            if e.kind() == std::io::ErrorKind::NotFound {
120                std::fs::create_dir_all(&root).map_err(|e| {
121                    Error::new(ErrorKind::Unexpected, "create root dir failed")
122                        .with_operation("Builder::build")
123                        .with_context("root", root.to_string_lossy())
124                        .set_source(e)
125                })?;
126            }
127        }
128
129        let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
130
131        // If atomic write dir is not exist, we must create it.
132        if let Some(d) = &atomic_write_dir {
133            if let Err(e) = std::fs::metadata(d) {
134                if e.kind() == std::io::ErrorKind::NotFound {
135                    std::fs::create_dir_all(d).map_err(|e| {
136                        Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
137                            .with_operation("Builder::build")
138                            .with_context("atomic_write_dir", d.to_string_lossy())
139                            .set_source(e)
140                    })?;
141                }
142            }
143        }
144
145        // Canonicalize the root directory. This should work since we already know that we can
146        // get the metadata of the path.
147        let root = root.canonicalize().map_err(|e| {
148            Error::new(
149                ErrorKind::Unexpected,
150                "canonicalize of root directory failed",
151            )
152            .set_source(e)
153        })?;
154
155        // Canonicalize the atomic_write_dir directory. This should work since we already know that
156        // we can get the metadata of the path.
157        let atomic_write_dir = atomic_write_dir
158            .map(|p| {
159                p.canonicalize().map(Some).map_err(|e| {
160                    Error::new(
161                        ErrorKind::Unexpected,
162                        "canonicalize of atomic_write_dir directory failed",
163                    )
164                    .with_operation("Builder::build")
165                    .with_context("root", root.to_string_lossy())
166                    .set_source(e)
167                })
168            })
169            .unwrap_or(Ok(None))?;
170
171        Ok(FsBackend {
172            core: Arc::new(FsCore {
173                info: {
174                    let am = AccessorInfo::default();
175                    am.set_scheme(FS_SCHEME)
176                        .set_root(&root.to_string_lossy())
177                        .set_native_capability(Capability {
178                            stat: true,
179
180                            read: true,
181
182                            write: true,
183                            write_can_empty: true,
184                            write_can_append: true,
185                            write_can_multi: true,
186                            write_with_if_not_exists: true,
187
188                            create_dir: true,
189                            delete: true,
190
191                            list: true,
192
193                            copy: true,
194                            rename: true,
195
196                            shared: true,
197
198                            ..Default::default()
199                        });
200
201                    am.into()
202                },
203                root,
204                atomic_write_dir,
205                buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
206            }),
207        })
208    }
209}
210
211/// Backend is used to serve `Accessor` support for posix-like fs.
212#[derive(Debug, Clone)]
213pub struct FsBackend {
214    core: Arc<FsCore>,
215}
216
217impl Access for FsBackend {
218    type Reader = FsReader<tokio::fs::File>;
219    type Writer = FsWriters;
220    type Lister = Option<FsLister<tokio::fs::ReadDir>>;
221    type Deleter = oio::OneShotDeleter<FsDeleter>;
222
223    fn info(&self) -> Arc<AccessorInfo> {
224        self.core.info.clone()
225    }
226
227    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
228        self.core.fs_create_dir(path).await?;
229        Ok(RpCreateDir::default())
230    }
231
232    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
233        let m = self.core.fs_stat(path).await?;
234        Ok(RpStat::new(m))
235    }
236
237    /// # Notes
238    ///
239    /// There are three ways to get the total file length:
240    ///
241    /// - call std::fs::metadata directly and then open. (400ns)
242    /// - open file first, and then use `f.metadata()` (300ns)
243    /// - open file first, and then use `seek`. (100ns)
244    ///
245    /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
246    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
247        let f = self.core.fs_read(path, &args).await?;
248        let r = FsReader::new(
249            self.core.clone(),
250            f,
251            args.range().size().unwrap_or(u64::MAX) as _,
252        );
253        Ok((RpRead::new(), r))
254    }
255
256    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
257        let is_append = op.append();
258        let concurrent = op.concurrent();
259
260        let writer = FsWriter::create(self.core.clone(), path, op).await?;
261
262        let writer = if is_append {
263            FsWriters::One(writer)
264        } else {
265            FsWriters::Two(oio::PositionWriter::new(
266                self.info().clone(),
267                writer,
268                concurrent,
269            ))
270        };
271
272        Ok((RpWrite::default(), writer))
273    }
274
275    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
276        Ok((
277            RpDelete::default(),
278            oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
279        ))
280    }
281
282    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
283        match self.core.fs_list(path).await? {
284            Some(f) => {
285                let rd = FsLister::new(&self.core.root, path, f);
286                Ok((RpList::default(), Some(rd)))
287            }
288            None => Ok((RpList::default(), None)),
289        }
290    }
291
292    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
293        self.core.fs_copy(from, to).await?;
294        Ok(RpCopy::default())
295    }
296
297    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
298        self.core.fs_rename(from, to).await?;
299        Ok(RpRename::default())
300    }
301}