opendal/services/fs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24
25use super::error::*;
26use crate::raw::*;
27use crate::*;
28
29#[derive(Debug)]
30pub struct FsCore {
31    pub info: Arc<AccessorInfo>,
32    pub root: PathBuf,
33    pub atomic_write_dir: Option<PathBuf>,
34    pub buf_pool: oio::PooledBuf,
35}
36
37impl FsCore {
38    // Build write path and ensure the parent dirs created
39    pub async fn ensure_write_abs_path(&self, parent: &Path, path: &str) -> Result<PathBuf> {
40        let p = parent.join(path);
41
42        // Create dir before write path.
43        //
44        // TODO(xuanwo): There are many works to do here:
45        //   - Is it safe to create dir concurrently?
46        //   - Do we need to extract this logic as new util functions?
47        //   - Is it better to check the parent dir exists before call mkdir?
48        let parent = PathBuf::from(&p)
49            .parent()
50            .ok_or_else(|| {
51                Error::new(
52                    ErrorKind::Unexpected,
53                    "path should have parent but not, it must be malformed",
54                )
55                .with_context("input", p.to_string_lossy())
56            })?
57            .to_path_buf();
58
59        tokio::fs::create_dir_all(&parent)
60            .await
61            .map_err(new_std_io_error)?;
62
63        Ok(p)
64    }
65
66    pub async fn fs_create_dir(&self, path: &str) -> Result<()> {
67        let p = self.root.join(path.trim_end_matches('/'));
68        tokio::fs::create_dir_all(&p)
69            .await
70            .map_err(new_std_io_error)?;
71        Ok(())
72    }
73
74    pub async fn fs_stat(&self, path: &str) -> Result<Metadata> {
75        let p = self.root.join(path.trim_end_matches('/'));
76        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
77
78        let mode = if meta.is_dir() {
79            EntryMode::DIR
80        } else if meta.is_file() {
81            EntryMode::FILE
82        } else {
83            EntryMode::Unknown
84        };
85        let m = Metadata::new(mode)
86            .with_content_length(meta.len())
87            .with_last_modified(
88                meta.modified()
89                    .map(DateTime::from)
90                    .map_err(new_std_io_error)?,
91            );
92
93        Ok(m)
94    }
95
96    pub async fn fs_read(&self, path: &str, args: &OpRead) -> Result<tokio::fs::File> {
97        let p = self.root.join(path.trim_end_matches('/'));
98
99        let mut f = tokio::fs::OpenOptions::new()
100            .read(true)
101            .open(&p)
102            .await
103            .map_err(new_std_io_error)?;
104
105        if args.range().offset() != 0 {
106            use tokio::io::AsyncSeekExt;
107            f.seek(SeekFrom::Start(args.range().offset()))
108                .await
109                .map_err(new_std_io_error)?;
110        }
111
112        Ok(f)
113    }
114
115    pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> Result<tokio::fs::File> {
116        let mut open_options = tokio::fs::OpenOptions::new();
117        if op.if_not_exists() {
118            open_options.create_new(true);
119        } else {
120            open_options.create(true);
121        }
122
123        open_options.write(true);
124
125        if op.append() {
126            open_options.append(true);
127        } else {
128            open_options.truncate(true);
129        }
130
131        let f = open_options.open(path).await.map_err(parse_error)?;
132
133        Ok(f)
134    }
135
136    /// This function is used to build a tempfile for writing.
137    ///
138    /// We don't care about the OpWrite since every check should be performed on target path directly.
139    pub async fn fs_tempfile_write(
140        &self,
141        path: &str,
142    ) -> Result<(tokio::fs::File, Option<PathBuf>)> {
143        let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
144            return Err(Error::new(ErrorKind::Unexpected, "fs didn't configure atomic_write_dir, but we're still entering the tempfile logic. This might be a bug."));
145        };
146
147        let tmp_path = self
148            .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
149            .await?;
150
151        let mut open_options = tokio::fs::OpenOptions::new();
152
153        // tempfile should always be new file.
154        open_options.create_new(true);
155        open_options.write(true);
156        open_options.truncate(true);
157
158        let f = open_options.open(&tmp_path).await.map_err(parse_error)?;
159
160        Ok((f, Some(tmp_path)))
161    }
162
163    pub async fn fs_list(&self, path: &str) -> Result<Option<tokio::fs::ReadDir>> {
164        let p = self.root.join(path.trim_end_matches('/'));
165
166        match tokio::fs::read_dir(&p).await {
167            Ok(rd) => Ok(Some(rd)),
168            Err(e) => {
169                match e.kind() {
170                    // Return empty list if the directory not found
171                    std::io::ErrorKind::NotFound => Ok(None),
172                    // TODO: enable after our MSRV has been raised to 1.83
173                    //
174                    // If the path is not a directory, return an empty list
175                    //
176                    // The path could be a file or a symbolic link in this case.
177                    // Returning a NotADirectory error to the user isn't helpful; instead,
178                    // providing an empty directory is a more user-friendly. In fact, the dir
179                    // `path/` does not exist.
180                    // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)),
181                    _ => {
182                        // TODO: remove this after we have MSRV 1.83
183                        #[cfg(unix)]
184                        if e.raw_os_error() == Some(20) {
185                            // On unix 20: Not a directory
186                            return Ok(None);
187                        }
188                        #[cfg(windows)]
189                        if e.raw_os_error() == Some(267) {
190                            // On windows 267: DIRECTORY
191                            return Ok(None);
192                        }
193
194                        Err(new_std_io_error(e))
195                    }
196                }
197            }
198        }
199    }
200
201    pub async fn fs_copy(&self, from: &str, to: &str) -> Result<()> {
202        let from = self.root.join(from.trim_end_matches('/'));
203        // try to get the metadata of the source file to ensure it exists
204        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
205
206        let to = self
207            .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
208            .await?;
209
210        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
211        Ok(())
212    }
213
214    pub async fn fs_rename(&self, from: &str, to: &str) -> Result<()> {
215        let from = self.root.join(from.trim_end_matches('/'));
216        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
217
218        let to = self
219            .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
220            .await?;
221
222        tokio::fs::rename(from, to)
223            .await
224            .map_err(new_std_io_error)?;
225        Ok(())
226    }
227}