opendal/services/fs/
writer.rs1use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21
22use bytes::Buf;
23use tokio::io::AsyncWriteExt;
24
25use crate::raw::*;
26use crate::*;
27
28pub type FsWriters =
29 TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;
30
31pub struct FsWriter<F> {
32 target_path: PathBuf,
33 tmp_path: Option<PathBuf>,
34
35 f: Option<F>,
36}
37
38impl<F> FsWriter<F> {
39 pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
40 Self {
41 target_path,
42 tmp_path,
43
44 f: Some(f),
45 }
46 }
47}
48
49unsafe impl<F> Sync for FsWriter<F> {}
53
54impl oio::Write for FsWriter<tokio::fs::File> {
55 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
56 let f = self.f.as_mut().expect("FsWriter must be initialized");
57
58 while bs.has_remaining() {
59 let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
60 bs.advance(n);
61 }
62
63 Ok(())
64 }
65
66 async fn close(&mut self) -> Result<Metadata> {
67 let f = self.f.as_mut().expect("FsWriter must be initialized");
68 f.flush().await.map_err(new_std_io_error)?;
69 f.sync_all().await.map_err(new_std_io_error)?;
70
71 if let Some(tmp_path) = &self.tmp_path {
72 tokio::fs::rename(tmp_path, &self.target_path)
73 .await
74 .map_err(new_std_io_error)?;
75 }
76
77 let file_meta = f.metadata().await.map_err(new_std_io_error)?;
78 let mode = if file_meta.is_file() {
79 EntryMode::FILE
80 } else if file_meta.is_dir() {
81 EntryMode::DIR
82 } else {
83 EntryMode::Unknown
84 };
85 let meta = Metadata::new(mode)
86 .with_content_length(file_meta.len())
87 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
88 Ok(meta)
89 }
90
91 async fn abort(&mut self) -> Result<()> {
92 if let Some(tmp_path) = &self.tmp_path {
93 tokio::fs::remove_file(tmp_path)
94 .await
95 .map_err(new_std_io_error)
96 } else {
97 Err(Error::new(
98 ErrorKind::Unsupported,
99 "Fs doesn't support abort if atomic_write_dir is not set",
100 ))
101 }
102 }
103}
104
105impl oio::BlockingWrite for FsWriter<std::fs::File> {
106 fn write(&mut self, mut bs: Buffer) -> Result<()> {
107 let f = self.f.as_mut().expect("FsWriter must be initialized");
108
109 while bs.has_remaining() {
110 let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
111 bs.advance(n);
112 }
113
114 Ok(())
115 }
116
117 fn close(&mut self) -> Result<Metadata> {
118 let f = self.f.as_mut().expect("FsWriter must be initialized");
119 f.sync_all().map_err(new_std_io_error)?;
120
121 if let Some(tmp_path) = &self.tmp_path {
122 std::fs::rename(tmp_path, &self.target_path).map_err(new_std_io_error)?;
123 }
124
125 let file_meta = f.metadata().map_err(new_std_io_error)?;
126 let mode = if file_meta.is_file() {
127 EntryMode::FILE
128 } else if file_meta.is_dir() {
129 EntryMode::DIR
130 } else {
131 EntryMode::Unknown
132 };
133 let meta = Metadata::new(mode)
134 .with_content_length(file_meta.len())
135 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
136 Ok(meta)
137 }
138}
139
140impl oio::PositionWrite for FsWriter<tokio::fs::File> {
141 async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
142 let f = self.f.as_ref().expect("FsWriter must be initialized");
143
144 let f = f
145 .try_clone()
146 .await
147 .map_err(new_std_io_error)?
148 .into_std()
149 .await;
150
151 tokio::task::spawn_blocking(move || {
152 let mut buf = buf;
153 let mut offset = offset;
154 while !buf.is_empty() {
155 match write_at(&f, buf.chunk(), offset) {
156 Ok(n) => {
157 buf.advance(n);
158 offset += n as u64
159 }
160 Err(e) => return Err(e),
161 }
162 }
163 Ok(())
164 })
165 .await
166 .map_err(new_task_join_error)?
167 }
168
169 async fn close(&self) -> Result<Metadata> {
170 let f = self.f.as_ref().expect("FsWriter must be initialized");
171
172 let mut f = f
173 .try_clone()
174 .await
175 .map_err(new_std_io_error)?
176 .into_std()
177 .await;
178
179 f.flush().map_err(new_std_io_error)?;
180 f.sync_all().map_err(new_std_io_error)?;
181
182 if let Some(tmp_path) = &self.tmp_path {
183 tokio::fs::rename(tmp_path, &self.target_path)
184 .await
185 .map_err(new_std_io_error)?;
186 }
187
188 let file_meta = f.metadata().map_err(new_std_io_error)?;
189 let mode = if file_meta.is_file() {
190 EntryMode::FILE
191 } else if file_meta.is_dir() {
192 EntryMode::DIR
193 } else {
194 EntryMode::Unknown
195 };
196 let meta = Metadata::new(mode)
197 .with_content_length(file_meta.len())
198 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
199 Ok(meta)
200 }
201
202 async fn abort(&self) -> Result<()> {
203 if let Some(tmp_path) = &self.tmp_path {
204 tokio::fs::remove_file(tmp_path)
205 .await
206 .map_err(new_std_io_error)
207 } else {
208 Err(Error::new(
209 ErrorKind::Unsupported,
210 "Fs doesn't support abort if atomic_write_dir is not set",
211 ))
212 }
213 }
214}
215
216#[cfg(windows)]
217fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
218 use std::os::windows::fs::FileExt;
219 f.seek_write(buf, offset).map_err(new_std_io_error)
220}
221
222#[cfg(unix)]
223fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
224 use std::os::unix::fs::FileExt;
225 f.write_at(buf, offset).map_err(new_std_io_error)
226}