opendal/services/fs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21
22use bytes::Buf;
23use tokio::io::AsyncWriteExt;
24
25use crate::raw::*;
26use crate::*;
27
28pub type FsWriters =
29    TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;
30
31pub struct FsWriter<F> {
32    target_path: PathBuf,
33    tmp_path: Option<PathBuf>,
34
35    f: Option<F>,
36}
37
38impl<F> FsWriter<F> {
39    pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
40        Self {
41            target_path,
42            tmp_path,
43
44            f: Some(f),
45        }
46    }
47}
48
49/// # Safety
50///
51/// We will only take `&mut Self` reference for FsWriter.
52unsafe impl<F> Sync for FsWriter<F> {}
53
54impl oio::Write for FsWriter<tokio::fs::File> {
55    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
56        let f = self.f.as_mut().expect("FsWriter must be initialized");
57
58        while bs.has_remaining() {
59            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
60            bs.advance(n);
61        }
62
63        Ok(())
64    }
65
66    async fn close(&mut self) -> Result<Metadata> {
67        let f = self.f.as_mut().expect("FsWriter must be initialized");
68        f.flush().await.map_err(new_std_io_error)?;
69        f.sync_all().await.map_err(new_std_io_error)?;
70
71        if let Some(tmp_path) = &self.tmp_path {
72            tokio::fs::rename(tmp_path, &self.target_path)
73                .await
74                .map_err(new_std_io_error)?;
75        }
76
77        let file_meta = f.metadata().await.map_err(new_std_io_error)?;
78        let mode = if file_meta.is_file() {
79            EntryMode::FILE
80        } else if file_meta.is_dir() {
81            EntryMode::DIR
82        } else {
83            EntryMode::Unknown
84        };
85        let meta = Metadata::new(mode)
86            .with_content_length(file_meta.len())
87            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
88        Ok(meta)
89    }
90
91    async fn abort(&mut self) -> Result<()> {
92        if let Some(tmp_path) = &self.tmp_path {
93            tokio::fs::remove_file(tmp_path)
94                .await
95                .map_err(new_std_io_error)
96        } else {
97            Err(Error::new(
98                ErrorKind::Unsupported,
99                "Fs doesn't support abort if atomic_write_dir is not set",
100            ))
101        }
102    }
103}
104
105impl oio::BlockingWrite for FsWriter<std::fs::File> {
106    fn write(&mut self, mut bs: Buffer) -> Result<()> {
107        let f = self.f.as_mut().expect("FsWriter must be initialized");
108
109        while bs.has_remaining() {
110            let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
111            bs.advance(n);
112        }
113
114        Ok(())
115    }
116
117    fn close(&mut self) -> Result<Metadata> {
118        let f = self.f.as_mut().expect("FsWriter must be initialized");
119        f.sync_all().map_err(new_std_io_error)?;
120
121        if let Some(tmp_path) = &self.tmp_path {
122            std::fs::rename(tmp_path, &self.target_path).map_err(new_std_io_error)?;
123        }
124
125        let file_meta = f.metadata().map_err(new_std_io_error)?;
126        let mode = if file_meta.is_file() {
127            EntryMode::FILE
128        } else if file_meta.is_dir() {
129            EntryMode::DIR
130        } else {
131            EntryMode::Unknown
132        };
133        let meta = Metadata::new(mode)
134            .with_content_length(file_meta.len())
135            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
136        Ok(meta)
137    }
138}
139
140impl oio::PositionWrite for FsWriter<tokio::fs::File> {
141    async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
142        let f = self.f.as_ref().expect("FsWriter must be initialized");
143
144        let f = f
145            .try_clone()
146            .await
147            .map_err(new_std_io_error)?
148            .into_std()
149            .await;
150
151        tokio::task::spawn_blocking(move || {
152            let mut buf = buf;
153            let mut offset = offset;
154            while !buf.is_empty() {
155                match write_at(&f, buf.chunk(), offset) {
156                    Ok(n) => {
157                        buf.advance(n);
158                        offset += n as u64
159                    }
160                    Err(e) => return Err(e),
161                }
162            }
163            Ok(())
164        })
165        .await
166        .map_err(new_task_join_error)?
167    }
168
169    async fn close(&self) -> Result<Metadata> {
170        let f = self.f.as_ref().expect("FsWriter must be initialized");
171
172        let mut f = f
173            .try_clone()
174            .await
175            .map_err(new_std_io_error)?
176            .into_std()
177            .await;
178
179        f.flush().map_err(new_std_io_error)?;
180        f.sync_all().map_err(new_std_io_error)?;
181
182        if let Some(tmp_path) = &self.tmp_path {
183            tokio::fs::rename(tmp_path, &self.target_path)
184                .await
185                .map_err(new_std_io_error)?;
186        }
187
188        let file_meta = f.metadata().map_err(new_std_io_error)?;
189        let mode = if file_meta.is_file() {
190            EntryMode::FILE
191        } else if file_meta.is_dir() {
192            EntryMode::DIR
193        } else {
194            EntryMode::Unknown
195        };
196        let meta = Metadata::new(mode)
197            .with_content_length(file_meta.len())
198            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
199        Ok(meta)
200    }
201
202    async fn abort(&self) -> Result<()> {
203        if let Some(tmp_path) = &self.tmp_path {
204            tokio::fs::remove_file(tmp_path)
205                .await
206                .map_err(new_std_io_error)
207        } else {
208            Err(Error::new(
209                ErrorKind::Unsupported,
210                "Fs doesn't support abort if atomic_write_dir is not set",
211            ))
212        }
213    }
214}
215
216#[cfg(windows)]
217fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
218    use std::os::windows::fs::FileExt;
219    f.seek_write(buf, offset).map_err(new_std_io_error)
220}
221
222#[cfg(unix)]
223fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
224    use std::os::unix::fs::FileExt;
225    f.write_at(buf, offset).map_err(new_std_io_error)
226}