opendal/services/fs/
core.rs1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use super::error::*;
24use crate::raw::*;
25use crate::*;
26
27#[derive(Debug)]
28pub struct FsCore {
29 pub info: Arc<AccessorInfo>,
30 pub root: PathBuf,
31 pub atomic_write_dir: Option<PathBuf>,
32 pub buf_pool: oio::PooledBuf,
33}
34
35impl FsCore {
36 pub async fn ensure_write_abs_path(&self, parent: &Path, path: &str) -> Result<PathBuf> {
38 let p = parent.join(path);
39
40 let parent = PathBuf::from(&p)
47 .parent()
48 .ok_or_else(|| {
49 Error::new(
50 ErrorKind::Unexpected,
51 "path should have parent but not, it must be malformed",
52 )
53 .with_context("input", p.to_string_lossy())
54 })?
55 .to_path_buf();
56
57 tokio::fs::create_dir_all(&parent)
58 .await
59 .map_err(new_std_io_error)?;
60
61 Ok(p)
62 }
63
64 pub async fn fs_create_dir(&self, path: &str) -> Result<()> {
65 let p = self.root.join(path.trim_end_matches('/'));
66 tokio::fs::create_dir_all(&p)
67 .await
68 .map_err(new_std_io_error)?;
69 Ok(())
70 }
71
72 pub async fn fs_stat(&self, path: &str) -> Result<Metadata> {
73 let p = self.root.join(path.trim_end_matches('/'));
74 let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
75 let mode = if meta.is_dir() {
76 EntryMode::DIR
77 } else if meta.is_file() {
78 EntryMode::FILE
79 } else {
80 EntryMode::Unknown
81 };
82 let m = Metadata::new(mode)
83 .with_content_length(meta.len())
84 .with_last_modified(Timestamp::try_from(
85 meta.modified().map_err(new_std_io_error)?,
86 )?);
87 Ok(m)
88 }
89
90 pub async fn fs_read(&self, path: &str, args: &OpRead) -> Result<tokio::fs::File> {
91 let p = self.root.join(path.trim_end_matches('/'));
92
93 let mut f = tokio::fs::OpenOptions::new()
94 .read(true)
95 .open(&p)
96 .await
97 .map_err(new_std_io_error)?;
98
99 if args.range().offset() != 0 {
100 use tokio::io::AsyncSeekExt;
101 f.seek(SeekFrom::Start(args.range().offset()))
102 .await
103 .map_err(new_std_io_error)?;
104 }
105
106 Ok(f)
107 }
108
109 pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> Result<tokio::fs::File> {
110 let mut open_options = tokio::fs::OpenOptions::new();
111 if op.if_not_exists() {
112 open_options.create_new(true);
113 } else {
114 open_options.create(true);
115 }
116
117 open_options.write(true);
118
119 if op.append() {
120 open_options.append(true);
121 } else {
122 open_options.truncate(true);
123 }
124
125 let f = open_options.open(path).await.map_err(parse_error)?;
126
127 Ok(f)
128 }
129
130 pub async fn fs_tempfile_write(
134 &self,
135 path: &str,
136 ) -> Result<(tokio::fs::File, Option<PathBuf>)> {
137 let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
138 return Err(Error::new(
139 ErrorKind::Unexpected,
140 "fs didn't configure atomic_write_dir, but we're still entering the tempfile logic. This might be a bug.",
141 ));
142 };
143
144 let tmp_path = self
145 .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
146 .await?;
147
148 let mut open_options = tokio::fs::OpenOptions::new();
149
150 open_options.create_new(true);
152 open_options.write(true);
153 open_options.truncate(true);
154
155 let f = open_options.open(&tmp_path).await.map_err(parse_error)?;
156
157 Ok((f, Some(tmp_path)))
158 }
159
160 pub async fn fs_list(&self, path: &str) -> Result<Option<tokio::fs::ReadDir>> {
161 let p = self.root.join(path.trim_end_matches('/'));
162
163 match tokio::fs::read_dir(&p).await {
164 Ok(rd) => Ok(Some(rd)),
165 Err(e) => {
166 match e.kind() {
167 std::io::ErrorKind::NotFound => Ok(None),
169 _ => {
179 #[cfg(unix)]
181 if e.raw_os_error() == Some(20) {
182 return Ok(None);
184 }
185 #[cfg(windows)]
186 if e.raw_os_error() == Some(267) {
187 return Ok(None);
189 }
190
191 Err(new_std_io_error(e))
192 }
193 }
194 }
195 }
196 }
197
198 pub async fn fs_copy(&self, from: &str, to: &str) -> Result<()> {
199 let from = self.root.join(from.trim_end_matches('/'));
200 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
202
203 let to = self
204 .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
205 .await?;
206
207 tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
208 Ok(())
209 }
210
211 pub async fn fs_rename(&self, from: &str, to: &str) -> Result<()> {
212 let from = self.root.join(from.trim_end_matches('/'));
213 tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
214
215 let to = self
216 .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
217 .await?;
218
219 tokio::fs::rename(from, to)
220 .await
221 .map_err(new_std_io_error)?;
222 Ok(())
223 }
224}