opendal/services/fs/
backend.rs1use std::path::PathBuf;
19use std::sync::Arc;
20
21use log::debug;
22
23use super::core::*;
24use super::delete::FsDeleter;
25use super::lister::FsLister;
26use super::reader::FsReader;
27use super::writer::FsWriter;
28use super::writer::FsWriters;
29use super::DEFAULT_SCHEME;
30use crate::raw::*;
31use crate::services::FsConfig;
32use crate::*;
33impl Configurator for FsConfig {
34 type Builder = FsBuilder;
35 fn into_builder(self) -> Self::Builder {
36 FsBuilder { config: self }
37 }
38}
39
40#[doc = include_str!("docs.md")]
42#[derive(Default, Debug)]
43pub struct FsBuilder {
44 config: FsConfig,
45}
46
47impl FsBuilder {
48 pub fn root(mut self, root: &str) -> Self {
50 self.config.root = if root.is_empty() {
51 None
52 } else {
53 Some(root.to_string())
54 };
55
56 self
57 }
58
59 pub fn atomic_write_dir(mut self, dir: &str) -> Self {
66 if !dir.is_empty() {
67 self.config.atomic_write_dir = Some(dir.to_string());
68 }
69
70 self
71 }
72}
73
74impl Builder for FsBuilder {
75 type Config = FsConfig;
76
77 fn build(self) -> Result<impl Access> {
78 debug!("backend build started: {:?}", &self);
79
80 let root = match self.config.root.map(PathBuf::from) {
81 Some(root) => Ok(root),
82 None => Err(Error::new(
83 ErrorKind::ConfigInvalid,
84 "root is not specified",
85 )),
86 }?;
87 debug!("backend use root {}", root.to_string_lossy());
88
89 if let Err(e) = std::fs::metadata(&root) {
91 if e.kind() == std::io::ErrorKind::NotFound {
92 std::fs::create_dir_all(&root).map_err(|e| {
93 Error::new(ErrorKind::Unexpected, "create root dir failed")
94 .with_operation("Builder::build")
95 .with_context("root", root.to_string_lossy())
96 .set_source(e)
97 })?;
98 }
99 }
100
101 let atomic_write_dir = self.config.atomic_write_dir.map(PathBuf::from);
102
103 if let Some(d) = &atomic_write_dir {
105 if let Err(e) = std::fs::metadata(d) {
106 if e.kind() == std::io::ErrorKind::NotFound {
107 std::fs::create_dir_all(d).map_err(|e| {
108 Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
109 .with_operation("Builder::build")
110 .with_context("atomic_write_dir", d.to_string_lossy())
111 .set_source(e)
112 })?;
113 }
114 }
115 }
116
117 let root = root.canonicalize().map_err(|e| {
120 Error::new(
121 ErrorKind::Unexpected,
122 "canonicalize of root directory failed",
123 )
124 .set_source(e)
125 })?;
126
127 let atomic_write_dir = atomic_write_dir
130 .map(|p| {
131 p.canonicalize().map(Some).map_err(|e| {
132 Error::new(
133 ErrorKind::Unexpected,
134 "canonicalize of atomic_write_dir directory failed",
135 )
136 .with_operation("Builder::build")
137 .with_context("root", root.to_string_lossy())
138 .set_source(e)
139 })
140 })
141 .unwrap_or(Ok(None))?;
142
143 Ok(FsBackend {
144 core: Arc::new(FsCore {
145 info: {
146 let am = AccessorInfo::default();
147 am.set_scheme(DEFAULT_SCHEME)
148 .set_root(&root.to_string_lossy())
149 .set_native_capability(Capability {
150 stat: true,
151
152 read: true,
153
154 write: true,
155 write_can_empty: true,
156 write_can_append: true,
157 write_can_multi: true,
158 write_with_if_not_exists: true,
159
160 create_dir: true,
161 delete: true,
162
163 list: true,
164
165 copy: true,
166 rename: true,
167
168 shared: true,
169
170 ..Default::default()
171 });
172
173 am.into()
174 },
175 root,
176 atomic_write_dir,
177 buf_pool: oio::PooledBuf::new(16).with_initial_capacity(256 * 1024),
178 }),
179 })
180 }
181}
182
183#[derive(Debug, Clone)]
185pub struct FsBackend {
186 core: Arc<FsCore>,
187}
188
189impl Access for FsBackend {
190 type Reader = FsReader<tokio::fs::File>;
191 type Writer = FsWriters;
192 type Lister = Option<FsLister<tokio::fs::ReadDir>>;
193 type Deleter = oio::OneShotDeleter<FsDeleter>;
194
195 fn info(&self) -> Arc<AccessorInfo> {
196 self.core.info.clone()
197 }
198
199 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
200 self.core.fs_create_dir(path).await?;
201 Ok(RpCreateDir::default())
202 }
203
204 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
205 let m = self.core.fs_stat(path).await?;
206 Ok(RpStat::new(m))
207 }
208
209 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
219 let f = self.core.fs_read(path, &args).await?;
220 let r = FsReader::new(
221 self.core.clone(),
222 f,
223 args.range().size().unwrap_or(u64::MAX) as _,
224 );
225 Ok((RpRead::new(), r))
226 }
227
228 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
229 let is_append = op.append();
230 let concurrent = op.concurrent();
231
232 let writer = FsWriter::create(self.core.clone(), path, op).await?;
233
234 let writer = if is_append {
235 FsWriters::One(writer)
236 } else {
237 FsWriters::Two(oio::PositionWriter::new(
238 self.info().clone(),
239 writer,
240 concurrent,
241 ))
242 };
243
244 Ok((RpWrite::default(), writer))
245 }
246
247 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
248 Ok((
249 RpDelete::default(),
250 oio::OneShotDeleter::new(FsDeleter::new(self.core.clone())),
251 ))
252 }
253
254 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
255 match self.core.fs_list(path).await? {
256 Some(f) => {
257 let rd = FsLister::new(&self.core.root, path, f);
258 Ok((RpList::default(), Some(rd)))
259 }
260 None => Ok((RpList::default(), None)),
261 }
262 }
263
264 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
265 self.core.fs_copy(from, to).await?;
266 Ok(RpCopy::default())
267 }
268
269 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
270 self.core.fs_rename(from, to).await?;
271 Ok(RpRename::default())
272 }
273}