opendal/services/fs/
writer.rs1use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use bytes::Buf;
24use tokio::io::AsyncWriteExt;
25
26use crate::raw::*;
27use crate::services::fs::core::FsCore;
28use crate::*;
29
30pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;
31
32pub struct FsWriter {
33 target_path: PathBuf,
34 temp_path: Option<PathBuf>,
36 f: tokio::fs::File,
37}
38
39impl FsWriter {
40 pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) -> Result<Self> {
41 let target_path = core.ensure_write_abs_path(&core.root, path).await?;
42
43 if core.atomic_write_dir.is_none() {
45 let target_file = core.fs_write(&target_path, &op).await?;
46
47 return Ok(Self {
48 target_path,
49 temp_path: None,
50 f: target_file,
51 });
52 }
53
54 let is_append = op.append();
55 let is_exist = tokio::fs::try_exists(&target_path)
56 .await
57 .map_err(new_std_io_error)?;
58 if op.if_not_exists() && is_exist {
59 return Err(Error::new(
60 ErrorKind::ConditionNotMatch,
61 "file already exists, doesn't match the condition if_not_exists",
62 ));
63 }
64
65 let (f, temp_path) = if !(is_append && is_exist) {
68 core.fs_tempfile_write(path).await?
69 } else {
70 let f = core.fs_write(&target_path, &op).await?;
71 (f, None)
72 };
73
74 Ok(Self {
75 target_path,
76 temp_path,
77 f,
78 })
79 }
80}
81
82unsafe impl Sync for FsWriter {}
86
87impl oio::Write for FsWriter {
88 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
89 while bs.has_remaining() {
90 let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
91 bs.advance(n);
92 }
93
94 Ok(())
95 }
96
97 async fn close(&mut self) -> Result<Metadata> {
98 self.f.flush().await.map_err(new_std_io_error)?;
99 self.f.sync_all().await.map_err(new_std_io_error)?;
100
101 if let Some(temp_path) = &self.temp_path {
102 tokio::fs::rename(temp_path, &self.target_path)
103 .await
104 .map_err(new_std_io_error)?;
105 }
106
107 let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
108 let meta = Metadata::new(EntryMode::FILE)
109 .with_content_length(file_meta.len())
110 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
111 Ok(meta)
112 }
113
114 async fn abort(&mut self) -> Result<()> {
115 if let Some(temp_path) = &self.temp_path {
116 tokio::fs::remove_file(temp_path)
117 .await
118 .map_err(new_std_io_error)
119 } else {
120 Err(Error::new(
121 ErrorKind::Unsupported,
122 "Fs doesn't support abort if atomic_write_dir is not set",
123 ))
124 }
125 }
126}
127
128impl oio::PositionWrite for FsWriter {
129 async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
130 let f = self
131 .f
132 .try_clone()
133 .await
134 .map_err(new_std_io_error)?
135 .into_std()
136 .await;
137
138 tokio::task::spawn_blocking(move || {
139 let mut buf = buf;
140 let mut offset = offset;
141 while !buf.is_empty() {
142 match write_at(&f, buf.chunk(), offset) {
143 Ok(n) => {
144 buf.advance(n);
145 offset += n as u64
146 }
147 Err(e) => return Err(e),
148 }
149 }
150 Ok(())
151 })
152 .await
153 .map_err(new_task_join_error)?
154 }
155
156 async fn close(&self) -> Result<Metadata> {
157 let mut f = self
158 .f
159 .try_clone()
160 .await
161 .map_err(new_std_io_error)?
162 .into_std()
163 .await;
164
165 f.flush().map_err(new_std_io_error)?;
166 f.sync_all().map_err(new_std_io_error)?;
167
168 if let Some(temp_path) = &self.temp_path {
169 tokio::fs::rename(temp_path, &self.target_path)
170 .await
171 .map_err(new_std_io_error)?;
172 }
173
174 let file_meta = f.metadata().map_err(new_std_io_error)?;
175 let mode = if file_meta.is_file() {
176 EntryMode::FILE
177 } else if file_meta.is_dir() {
178 EntryMode::DIR
179 } else {
180 EntryMode::Unknown
181 };
182 let meta = Metadata::new(mode)
183 .with_content_length(file_meta.len())
184 .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
185 Ok(meta)
186 }
187
188 async fn abort(&self) -> Result<()> {
189 if let Some(temp_path) = &self.temp_path {
190 tokio::fs::remove_file(temp_path)
191 .await
192 .map_err(new_std_io_error)
193 } else {
194 Err(Error::new(
195 ErrorKind::Unsupported,
196 "Fs doesn't support abort if atomic_write_dir is not set",
197 ))
198 }
199 }
200}
201
202#[cfg(windows)]
203fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
204 use std::os::windows::fs::FileExt;
205 f.seek_write(buf, offset).map_err(new_std_io_error)
206}
207
208#[cfg(unix)]
209fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
210 use std::os::unix::fs::FileExt;
211 f.write_at(buf, offset).map_err(new_std_io_error)
212}