opendal/services/fs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21
22use bytes::Buf;
23use tokio::io::AsyncWriteExt;
24
25use crate::raw::*;
26use crate::*;
27
28pub type FsWriters =
29    TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;
30
31pub struct FsWriter<F> {
32    target_path: PathBuf,
33    tmp_path: Option<PathBuf>,
34
35    f: Option<F>,
36}
37
38impl<F> FsWriter<F> {
39    pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
40        Self {
41            target_path,
42            tmp_path,
43
44            f: Some(f),
45        }
46    }
47}
48
49/// # Safety
50///
51/// We will only take `&mut Self` reference for FsWriter.
52unsafe impl<F> Sync for FsWriter<F> {}
53
54impl oio::Write for FsWriter<tokio::fs::File> {
55    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
56        let f = self.f.as_mut().expect("FsWriter must be initialized");
57
58        while bs.has_remaining() {
59            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
60            bs.advance(n);
61        }
62
63        Ok(())
64    }
65
66    async fn close(&mut self) -> Result<Metadata> {
67        let f = self.f.as_mut().expect("FsWriter must be initialized");
68        f.flush().await.map_err(new_std_io_error)?;
69        f.sync_all().await.map_err(new_std_io_error)?;
70
71        if let Some(tmp_path) = &self.tmp_path {
72            tokio::fs::rename(tmp_path, &self.target_path)
73                .await
74                .map_err(new_std_io_error)?;
75        }
76
77        let file_meta = f.metadata().await.map_err(new_std_io_error)?;
78        let mode = if file_meta.is_file() {
79            EntryMode::FILE
80        } else if file_meta.is_dir() {
81            EntryMode::DIR
82        } else {
83            EntryMode::Unknown
84        };
85        let meta = Metadata::new(mode)
86            .with_content_length(file_meta.len())
87            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
88        Ok(meta)
89    }
90
91    async fn abort(&mut self) -> Result<()> {
92        if let Some(tmp_path) = &self.tmp_path {
93            tokio::fs::remove_file(tmp_path)
94                .await
95                .map_err(new_std_io_error)
96        } else {
97            Err(Error::new(
98                ErrorKind::Unsupported,
99                "Fs doesn't support abort if atomic_write_dir is not set",
100            ))
101        }
102    }
103}
104
105impl oio::PositionWrite for FsWriter<tokio::fs::File> {
106    async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
107        let f = self.f.as_ref().expect("FsWriter must be initialized");
108
109        let f = f
110            .try_clone()
111            .await
112            .map_err(new_std_io_error)?
113            .into_std()
114            .await;
115
116        tokio::task::spawn_blocking(move || {
117            let mut buf = buf;
118            let mut offset = offset;
119            while !buf.is_empty() {
120                match write_at(&f, buf.chunk(), offset) {
121                    Ok(n) => {
122                        buf.advance(n);
123                        offset += n as u64
124                    }
125                    Err(e) => return Err(e),
126                }
127            }
128            Ok(())
129        })
130        .await
131        .map_err(new_task_join_error)?
132    }
133
134    async fn close(&self) -> Result<Metadata> {
135        let f = self.f.as_ref().expect("FsWriter must be initialized");
136
137        let mut f = f
138            .try_clone()
139            .await
140            .map_err(new_std_io_error)?
141            .into_std()
142            .await;
143
144        f.flush().map_err(new_std_io_error)?;
145        f.sync_all().map_err(new_std_io_error)?;
146
147        if let Some(tmp_path) = &self.tmp_path {
148            tokio::fs::rename(tmp_path, &self.target_path)
149                .await
150                .map_err(new_std_io_error)?;
151        }
152
153        let file_meta = f.metadata().map_err(new_std_io_error)?;
154        let mode = if file_meta.is_file() {
155            EntryMode::FILE
156        } else if file_meta.is_dir() {
157            EntryMode::DIR
158        } else {
159            EntryMode::Unknown
160        };
161        let meta = Metadata::new(mode)
162            .with_content_length(file_meta.len())
163            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
164        Ok(meta)
165    }
166
167    async fn abort(&self) -> Result<()> {
168        if let Some(tmp_path) = &self.tmp_path {
169            tokio::fs::remove_file(tmp_path)
170                .await
171                .map_err(new_std_io_error)
172        } else {
173            Err(Error::new(
174                ErrorKind::Unsupported,
175                "Fs doesn't support abort if atomic_write_dir is not set",
176            ))
177        }
178    }
179}
180
181#[cfg(windows)]
182fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
183    use std::os::windows::fs::FileExt;
184    f.seek_write(buf, offset).map_err(new_std_io_error)
185}
186
187#[cfg(unix)]
188fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
189    use std::os::unix::fs::FileExt;
190    f.write_at(buf, offset).map_err(new_std_io_error)
191}