opendal/services/fs/
writer.rs1use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21
22use bytes::Buf;
23use tokio::io::AsyncWriteExt;
24
25use crate::raw::*;
26use crate::*;
27
28pub type FsWriters =
29 TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;
30
31pub struct FsWriter<F> {
32 target_path: PathBuf,
33 tmp_path: Option<PathBuf>,
34
35 f: Option<F>,
36}
37
38impl<F> FsWriter<F> {
39 pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
40 Self {
41 target_path,
42 tmp_path,
43
44 f: Some(f),
45 }
46 }
47}
48
49unsafe impl<F> Sync for FsWriter<F> {}
53
54impl oio::Write for FsWriter<tokio::fs::File> {
55 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
56 let f = self.f.as_mut().expect("FsWriter must be initialized");
57
58 while bs.has_remaining() {
59 let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
60 bs.advance(n);
61 }
62
63 Ok(())
64 }
65
66 async fn close(&mut self) -> Result<Metadata> {
67 let f = self.f.as_mut().expect("FsWriter must be initialized");
68 f.flush().await.map_err(new_std_io_error)?;
69 f.sync_all().await.map_err(new_std_io_error)?;
70
71 if let Some(tmp_path) = &self.tmp_path {
72 tokio::fs::rename(tmp_path, &self.target_path)
73 .await
74 .map_err(new_std_io_error)?;
75 }
76
77 let file_meta = f.metadata().await.map_err(new_std_io_error)?;
78 let mode = if file_meta.is_file() {
79 EntryMode::FILE
80 } else if file_meta.is_dir() {
81 EntryMode::DIR
82 } else {
83 EntryMode::Unknown
84 };
85 let meta = Metadata::new(mode)
86 .with_content_length(file_meta.len())
87 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
88 Ok(meta)
89 }
90
91 async fn abort(&mut self) -> Result<()> {
92 if let Some(tmp_path) = &self.tmp_path {
93 tokio::fs::remove_file(tmp_path)
94 .await
95 .map_err(new_std_io_error)
96 } else {
97 Err(Error::new(
98 ErrorKind::Unsupported,
99 "Fs doesn't support abort if atomic_write_dir is not set",
100 ))
101 }
102 }
103}
104
105impl oio::PositionWrite for FsWriter<tokio::fs::File> {
106 async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
107 let f = self.f.as_ref().expect("FsWriter must be initialized");
108
109 let f = f
110 .try_clone()
111 .await
112 .map_err(new_std_io_error)?
113 .into_std()
114 .await;
115
116 tokio::task::spawn_blocking(move || {
117 let mut buf = buf;
118 let mut offset = offset;
119 while !buf.is_empty() {
120 match write_at(&f, buf.chunk(), offset) {
121 Ok(n) => {
122 buf.advance(n);
123 offset += n as u64
124 }
125 Err(e) => return Err(e),
126 }
127 }
128 Ok(())
129 })
130 .await
131 .map_err(new_task_join_error)?
132 }
133
134 async fn close(&self) -> Result<Metadata> {
135 let f = self.f.as_ref().expect("FsWriter must be initialized");
136
137 let mut f = f
138 .try_clone()
139 .await
140 .map_err(new_std_io_error)?
141 .into_std()
142 .await;
143
144 f.flush().map_err(new_std_io_error)?;
145 f.sync_all().map_err(new_std_io_error)?;
146
147 if let Some(tmp_path) = &self.tmp_path {
148 tokio::fs::rename(tmp_path, &self.target_path)
149 .await
150 .map_err(new_std_io_error)?;
151 }
152
153 let file_meta = f.metadata().map_err(new_std_io_error)?;
154 let mode = if file_meta.is_file() {
155 EntryMode::FILE
156 } else if file_meta.is_dir() {
157 EntryMode::DIR
158 } else {
159 EntryMode::Unknown
160 };
161 let meta = Metadata::new(mode)
162 .with_content_length(file_meta.len())
163 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
164 Ok(meta)
165 }
166
167 async fn abort(&self) -> Result<()> {
168 if let Some(tmp_path) = &self.tmp_path {
169 tokio::fs::remove_file(tmp_path)
170 .await
171 .map_err(new_std_io_error)
172 } else {
173 Err(Error::new(
174 ErrorKind::Unsupported,
175 "Fs doesn't support abort if atomic_write_dir is not set",
176 ))
177 }
178 }
179}
180
181#[cfg(windows)]
182fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
183 use std::os::windows::fs::FileExt;
184 f.seek_write(buf, offset).map_err(new_std_io_error)
185}
186
187#[cfg(unix)]
188fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
189 use std::os::unix::fs::FileExt;
190 f.write_at(buf, offset).map_err(new_std_io_error)
191}