opendal/services/fs/
core.rs1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use chrono::DateTime;
24
25use super::error::*;
26use crate::raw::*;
27use crate::*;
28
29#[derive(Debug)]
30pub struct FsCore {
31 pub info: Arc<AccessorInfo>,
32 pub root: PathBuf,
33 pub atomic_write_dir: Option<PathBuf>,
34 pub buf_pool: oio::PooledBuf,
35}
36
37impl FsCore {
38 pub async fn ensure_write_abs_path(&self, parent: &Path, path: &str) -> Result<PathBuf> {
40 let p = parent.join(path);
41
42 let parent = PathBuf::from(&p)
49 .parent()
50 .ok_or_else(|| {
51 Error::new(
52 ErrorKind::Unexpected,
53 "path should have parent but not, it must be malformed",
54 )
55 .with_context("input", p.to_string_lossy())
56 })?
57 .to_path_buf();
58
59 tokio::fs::create_dir_all(&parent)
60 .await
61 .map_err(new_std_io_error)?;
62
63 Ok(p)
64 }
65
66 pub async fn fs_create_dir(&self, path: &str) -> Result<()> {
67 let p = self.root.join(path.trim_end_matches('/'));
68 tokio::fs::create_dir_all(&p)
69 .await
70 .map_err(new_std_io_error)?;
71 Ok(())
72 }
73
74 pub async fn fs_stat(&self, path: &str) -> Result<Metadata> {
75 let p = self.root.join(path.trim_end_matches('/'));
76 let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
77
78 let mode = if meta.is_dir() {
79 EntryMode::DIR
80 } else if meta.is_file() {
81 EntryMode::FILE
82 } else {
83 EntryMode::Unknown
84 };
85 let m = Metadata::new(mode)
86 .with_content_length(meta.len())
87 .with_last_modified(
88 meta.modified()
89 .map(DateTime::from)
90 .map_err(new_std_io_error)?,
91 );
92
93 Ok(m)
94 }
95
96 pub async fn fs_read(&self, path: &str, args: &OpRead) -> Result<tokio::fs::File> {
97 let p = self.root.join(path.trim_end_matches('/'));
98
99 let mut f = tokio::fs::OpenOptions::new()
100 .read(true)
101 .open(&p)
102 .await
103 .map_err(new_std_io_error)?;
104
105 if args.range().offset() != 0 {
106 use tokio::io::AsyncSeekExt;
107 f.seek(SeekFrom::Start(args.range().offset()))
108 .await
109 .map_err(new_std_io_error)?;
110 }
111
112 Ok(f)
113 }
114
115 pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> Result<tokio::fs::File> {
116 let mut open_options = tokio::fs::OpenOptions::new();
117 if op.if_not_exists() {
118 open_options.create_new(true);
119 } else {
120 open_options.create(true);
121 }
122
123 open_options.write(true);
124
125 if op.append() {
126 open_options.append(true);
127 } else {
128 open_options.truncate(true);
129 }
130
131 let f = open_options.open(path).await.map_err(parse_error)?;
132
133 Ok(f)
134 }
135
136 pub async fn fs_tempfile_write(
140 &self,
141 path: &str,
142 ) -> Result<(tokio::fs::File, Option<PathBuf>)> {
143 let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
144 return Err(Error::new(ErrorKind::Unexpected, "fs didn't configure atomic_write_dir, but we're still entering the tempfile logic. This might be a bug."));
145 };
146
147 let tmp_path = self
148 .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
149 .await?;
150
151 let mut open_options = tokio::fs::OpenOptions::new();
152
153 open_options.create_new(true);
155 open_options.write(true);
156 open_options.truncate(true);
157
158 let f = open_options.open(&tmp_path).await.map_err(parse_error)?;
159
160 Ok((f, Some(tmp_path)))
161 }
162
163 pub async fn fs_list(&self, path: &str) -> Result<Option<tokio::fs::ReadDir>> {
164 let p = self.root.join(path.trim_end_matches('/'));
165
166 match tokio::fs::read_dir(&p).await {
167 Ok(rd) => Ok(Some(rd)),
168 Err(e) => {
169 match e.kind() {
170 std::io::ErrorKind::NotFound => Ok(None),
172 _ => {
182 #[cfg(unix)]
184 if e.raw_os_error() == Some(20) {
185 return Ok(None);
187 }
188 #[cfg(windows)]
189 if e.raw_os_error() == Some(267) {
190 return Ok(None);
192 }
193
194 Err(new_std_io_error(e))
195 }
196 }
197 }
198 }
199 }
200
201 pub async fn fs_copy(&self, from: &str, to: &str) -> Result<()> {
202 let from = self.root.join(from.trim_end_matches('/'));
203 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
205
206 let to = self
207 .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
208 .await?;
209
210 tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
211 Ok(())
212 }
213
214 pub async fn fs_rename(&self, from: &str, to: &str) -> Result<()> {
215 let from = self.root.join(from.trim_end_matches('/'));
216 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
217
218 let to = self
219 .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
220 .await?;
221
222 tokio::fs::rename(from, to)
223 .await
224 .map_err(new_std_io_error)?;
225 Ok(())
226 }
227}