opendal/services/fs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use super::error::*;
24use crate::raw::*;
25use crate::*;
26
27#[derive(Debug)]
28pub struct FsCore {
29    pub info: Arc<AccessorInfo>,
30    pub root: PathBuf,
31    pub atomic_write_dir: Option<PathBuf>,
32    pub buf_pool: oio::PooledBuf,
33}
34
35impl FsCore {
36    // Build write path and ensure the parent dirs created
37    pub async fn ensure_write_abs_path(&self, parent: &Path, path: &str) -> Result<PathBuf> {
38        let p = parent.join(path);
39
40        // Create dir before write path.
41        //
42        // TODO(xuanwo): There are many works to do here:
43        //   - Is it safe to create dir concurrently?
44        //   - Do we need to extract this logic as new util functions?
45        //   - Is it better to check the parent dir exists before call mkdir?
46        let parent = PathBuf::from(&p)
47            .parent()
48            .ok_or_else(|| {
49                Error::new(
50                    ErrorKind::Unexpected,
51                    "path should have parent but not, it must be malformed",
52                )
53                .with_context("input", p.to_string_lossy())
54            })?
55            .to_path_buf();
56
57        tokio::fs::create_dir_all(&parent)
58            .await
59            .map_err(new_std_io_error)?;
60
61        Ok(p)
62    }
63
64    pub async fn fs_create_dir(&self, path: &str) -> Result<()> {
65        let p = self.root.join(path.trim_end_matches('/'));
66        tokio::fs::create_dir_all(&p)
67            .await
68            .map_err(new_std_io_error)?;
69        Ok(())
70    }
71
72    pub async fn fs_stat(&self, path: &str) -> Result<Metadata> {
73        let p = self.root.join(path.trim_end_matches('/'));
74        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
75        let mode = if meta.is_dir() {
76            EntryMode::DIR
77        } else if meta.is_file() {
78            EntryMode::FILE
79        } else {
80            EntryMode::Unknown
81        };
82        let m = Metadata::new(mode)
83            .with_content_length(meta.len())
84            .with_last_modified(Timestamp::try_from(
85                meta.modified().map_err(new_std_io_error)?,
86            )?);
87        Ok(m)
88    }
89
90    pub async fn fs_read(&self, path: &str, args: &OpRead) -> Result<tokio::fs::File> {
91        let p = self.root.join(path.trim_end_matches('/'));
92
93        let mut f = tokio::fs::OpenOptions::new()
94            .read(true)
95            .open(&p)
96            .await
97            .map_err(new_std_io_error)?;
98
99        if args.range().offset() != 0 {
100            use tokio::io::AsyncSeekExt;
101            f.seek(SeekFrom::Start(args.range().offset()))
102                .await
103                .map_err(new_std_io_error)?;
104        }
105
106        Ok(f)
107    }
108
109    pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> Result<tokio::fs::File> {
110        let mut open_options = tokio::fs::OpenOptions::new();
111        if op.if_not_exists() {
112            open_options.create_new(true);
113        } else {
114            open_options.create(true);
115        }
116
117        open_options.write(true);
118
119        if op.append() {
120            open_options.append(true);
121        } else {
122            open_options.truncate(true);
123        }
124
125        let f = open_options.open(path).await.map_err(parse_error)?;
126
127        Ok(f)
128    }
129
130    /// This function is used to build a tempfile for writing.
131    ///
132    /// We don't care about the OpWrite since every check should be performed on target path directly.
133    pub async fn fs_tempfile_write(
134        &self,
135        path: &str,
136    ) -> Result<(tokio::fs::File, Option<PathBuf>)> {
137        let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
138            return Err(Error::new(
139                ErrorKind::Unexpected,
140                "fs didn't configure atomic_write_dir, but we're still entering the tempfile logic. This might be a bug.",
141            ));
142        };
143
144        let tmp_path = self
145            .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
146            .await?;
147
148        let mut open_options = tokio::fs::OpenOptions::new();
149
150        // tempfile should always be new file.
151        open_options.create_new(true);
152        open_options.write(true);
153        open_options.truncate(true);
154
155        let f = open_options.open(&tmp_path).await.map_err(parse_error)?;
156
157        Ok((f, Some(tmp_path)))
158    }
159
160    pub async fn fs_list(&self, path: &str) -> Result<Option<tokio::fs::ReadDir>> {
161        let p = self.root.join(path.trim_end_matches('/'));
162
163        match tokio::fs::read_dir(&p).await {
164            Ok(rd) => Ok(Some(rd)),
165            Err(e) => {
166                match e.kind() {
167                    // Return empty list if the directory not found
168                    std::io::ErrorKind::NotFound => Ok(None),
169                    // TODO: enable after our MSRV has been raised to 1.83
170                    //
171                    // If the path is not a directory, return an empty list
172                    //
173                    // The path could be a file or a symbolic link in this case.
174                    // Returning a NotADirectory error to the user isn't helpful; instead,
175                    // providing an empty directory is a more user-friendly. In fact, the dir
176                    // `path/` does not exist.
177                    // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
178                    _ => {
179                        // TODO: remove this after we have MSRV 1.83
180                        #[cfg(unix)]
181                        if e.raw_os_error() == Some(20) {
182                            // On unix 20: Not a directory
183                            return Ok(None);
184                        }
185                        #[cfg(windows)]
186                        if e.raw_os_error() == Some(267) {
187                            // On windows 267: DIRECTORY
188                            return Ok(None);
189                        }
190
191                        Err(new_std_io_error(e))
192                    }
193                }
194            }
195        }
196    }
197
198    pub async fn fs_copy(&self, from: &str, to: &str) -> Result<()> {
199        let from = self.root.join(from.trim_end_matches('/'));
200        // try to get the metadata of the source file to ensure it exists
201        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
202
203        let to = self
204            .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
205            .await?;
206
207        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
208        Ok(())
209    }
210
211    pub async fn fs_rename(&self, from: &str, to: &str) -> Result<()> {
212        let from = self.root.join(from.trim_end_matches('/'));
213        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
214
215        let to = self
216            .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
217            .await?;
218
219        tokio::fs::rename(from, to)
220            .await
221            .map_err(new_std_io_error)?;
222        Ok(())
223    }
224}