opendal/services/fs/
writer.rs1use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use bytes::Buf;
24use tokio::io::AsyncWriteExt;
25
26use crate::raw::*;
27use crate::services::fs::core::FsCore;
28use crate::*;
29
30pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;
31
32pub struct FsWriter {
33 target_path: PathBuf,
34 temp_path: Option<PathBuf>,
36 f: tokio::fs::File,
37}
38
39impl FsWriter {
40 pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) -> Result<Self> {
41 let target_path = core.ensure_write_abs_path(&core.root, path).await?;
42
43 if core.atomic_write_dir.is_none() {
45 let target_file = core.fs_write(&target_path, &op).await?;
46
47 return Ok(Self {
48 target_path,
49 temp_path: None,
50 f: target_file,
51 });
52 }
53
54 let is_append = op.append();
55 let is_exist = tokio::fs::try_exists(&target_path)
56 .await
57 .map_err(new_std_io_error)?;
58 if op.if_not_exists() && is_exist {
59 return Err(Error::new(
60 ErrorKind::ConditionNotMatch,
61 "file already exists, doesn't match the condition if_not_exists",
62 ));
63 }
64
65 let (f, temp_path) = if !(is_append && is_exist) {
68 core.fs_tempfile_write(path).await?
69 } else {
70 let f = core.fs_write(&target_path, &op).await?;
71 (f, None)
72 };
73
74 Ok(Self {
75 target_path,
76 temp_path,
77 f,
78 })
79 }
80}
81
82unsafe impl Sync for FsWriter {}
86
87impl oio::Write for FsWriter {
88 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
89 while bs.has_remaining() {
90 let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
91 bs.advance(n);
92 }
93
94 Ok(())
95 }
96
97 async fn close(&mut self) -> Result<Metadata> {
98 self.f.flush().await.map_err(new_std_io_error)?;
99 self.f.sync_all().await.map_err(new_std_io_error)?;
100
101 if let Some(temp_path) = &self.temp_path {
102 tokio::fs::rename(temp_path, &self.target_path)
103 .await
104 .map_err(new_std_io_error)?;
105 }
106
107 let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
108 let meta = Metadata::new(EntryMode::FILE)
109 .with_content_length(file_meta.len())
110 .with_last_modified(Timestamp::try_from(
111 file_meta.modified().map_err(new_std_io_error)?,
112 )?);
113 Ok(meta)
114 }
115
116 async fn abort(&mut self) -> Result<()> {
117 if let Some(temp_path) = &self.temp_path {
118 tokio::fs::remove_file(temp_path)
119 .await
120 .map_err(new_std_io_error)
121 } else {
122 Err(Error::new(
123 ErrorKind::Unsupported,
124 "Fs doesn't support abort if atomic_write_dir is not set",
125 ))
126 }
127 }
128}
129
130impl oio::PositionWrite for FsWriter {
131 async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
132 let f = self
133 .f
134 .try_clone()
135 .await
136 .map_err(new_std_io_error)?
137 .into_std()
138 .await;
139
140 tokio::task::spawn_blocking(move || {
141 let mut buf = buf;
142 let mut offset = offset;
143 while !buf.is_empty() {
144 match write_at(&f, buf.chunk(), offset) {
145 Ok(n) => {
146 buf.advance(n);
147 offset += n as u64
148 }
149 Err(e) => return Err(e),
150 }
151 }
152 Ok(())
153 })
154 .await
155 .map_err(new_task_join_error)?
156 }
157
158 async fn close(&self) -> Result<Metadata> {
159 let mut f = self
160 .f
161 .try_clone()
162 .await
163 .map_err(new_std_io_error)?
164 .into_std()
165 .await;
166
167 f.flush().map_err(new_std_io_error)?;
168 f.sync_all().map_err(new_std_io_error)?;
169
170 if let Some(temp_path) = &self.temp_path {
171 tokio::fs::rename(temp_path, &self.target_path)
172 .await
173 .map_err(new_std_io_error)?;
174 }
175
176 let file_meta = f.metadata().map_err(new_std_io_error)?;
177 let mode = if file_meta.is_file() {
178 EntryMode::FILE
179 } else if file_meta.is_dir() {
180 EntryMode::DIR
181 } else {
182 EntryMode::Unknown
183 };
184 let meta = Metadata::new(mode)
185 .with_content_length(file_meta.len())
186 .with_last_modified(Timestamp::try_from(
187 file_meta.modified().map_err(new_std_io_error)?,
188 )?);
189 Ok(meta)
190 }
191
192 async fn abort(&self) -> Result<()> {
193 if let Some(temp_path) = &self.temp_path {
194 tokio::fs::remove_file(temp_path)
195 .await
196 .map_err(new_std_io_error)
197 } else {
198 Err(Error::new(
199 ErrorKind::Unsupported,
200 "Fs doesn't support abort if atomic_write_dir is not set",
201 ))
202 }
203 }
204}
205
206#[cfg(windows)]
207fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
208 use std::os::windows::fs::FileExt;
209 f.seek_write(buf, offset).map_err(new_std_io_error)
210}
211
212#[cfg(unix)]
213fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
214 use std::os::unix::fs::FileExt;
215 f.write_at(buf, offset).map_err(new_std_io_error)
216}