opendal/services/fs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use bytes::Buf;
24use tokio::io::AsyncWriteExt;
25
26use crate::raw::*;
27use crate::services::fs::core::FsCore;
28use crate::*;
29
30pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;
31
32pub struct FsWriter {
33    target_path: PathBuf,
34    /// The temp_path is used to specify whether we should move to target_path after the file has been closed.
35    temp_path: Option<PathBuf>,
36    f: tokio::fs::File,
37}
38
39impl FsWriter {
40    pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) -> Result<Self> {
41        let target_path = core.ensure_write_abs_path(&core.root, path).await?;
42
43        // Quick path while atomic_write_dir is not set.
44        if core.atomic_write_dir.is_none() {
45            let target_file = core.fs_write(&target_path, &op).await?;
46
47            return Ok(Self {
48                target_path,
49                temp_path: None,
50                f: target_file,
51            });
52        }
53
54        let is_append = op.append();
55        let is_exist = tokio::fs::try_exists(&target_path)
56            .await
57            .map_err(new_std_io_error)?;
58        if op.if_not_exists() && is_exist {
59            return Err(Error::new(
60                ErrorKind::ConditionNotMatch,
61                "file already exists, doesn't match the condition if_not_exists",
62            ));
63        }
64
65        // The only case we allow write in place is the file
66        // exists and users request for append writing.
67        let (f, temp_path) = if !(is_append && is_exist) {
68            core.fs_tempfile_write(path).await?
69        } else {
70            let f = core.fs_write(&target_path, &op).await?;
71            (f, None)
72        };
73
74        Ok(Self {
75            target_path,
76            temp_path,
77            f,
78        })
79    }
80}
81
82/// # Safety
83///
84/// We will only take `&mut Self` reference for FsWriter.
85unsafe impl Sync for FsWriter {}
86
87impl oio::Write for FsWriter {
88    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
89        while bs.has_remaining() {
90            let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
91            bs.advance(n);
92        }
93
94        Ok(())
95    }
96
97    async fn close(&mut self) -> Result<Metadata> {
98        self.f.flush().await.map_err(new_std_io_error)?;
99        self.f.sync_all().await.map_err(new_std_io_error)?;
100
101        if let Some(temp_path) = &self.temp_path {
102            tokio::fs::rename(temp_path, &self.target_path)
103                .await
104                .map_err(new_std_io_error)?;
105        }
106
107        let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
108        let meta = Metadata::new(EntryMode::FILE)
109            .with_content_length(file_meta.len())
110            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
111        Ok(meta)
112    }
113
114    async fn abort(&mut self) -> Result<()> {
115        if let Some(temp_path) = &self.temp_path {
116            tokio::fs::remove_file(temp_path)
117                .await
118                .map_err(new_std_io_error)
119        } else {
120            Err(Error::new(
121                ErrorKind::Unsupported,
122                "Fs doesn't support abort if atomic_write_dir is not set",
123            ))
124        }
125    }
126}
127
128impl oio::PositionWrite for FsWriter {
129    async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
130        let f = self
131            .f
132            .try_clone()
133            .await
134            .map_err(new_std_io_error)?
135            .into_std()
136            .await;
137
138        tokio::task::spawn_blocking(move || {
139            let mut buf = buf;
140            let mut offset = offset;
141            while !buf.is_empty() {
142                match write_at(&f, buf.chunk(), offset) {
143                    Ok(n) => {
144                        buf.advance(n);
145                        offset += n as u64
146                    }
147                    Err(e) => return Err(e),
148                }
149            }
150            Ok(())
151        })
152        .await
153        .map_err(new_task_join_error)?
154    }
155
156    async fn close(&self) -> Result<Metadata> {
157        let mut f = self
158            .f
159            .try_clone()
160            .await
161            .map_err(new_std_io_error)?
162            .into_std()
163            .await;
164
165        f.flush().map_err(new_std_io_error)?;
166        f.sync_all().map_err(new_std_io_error)?;
167
168        if let Some(temp_path) = &self.temp_path {
169            tokio::fs::rename(temp_path, &self.target_path)
170                .await
171                .map_err(new_std_io_error)?;
172        }
173
174        let file_meta = f.metadata().map_err(new_std_io_error)?;
175        let mode = if file_meta.is_file() {
176            EntryMode::FILE
177        } else if file_meta.is_dir() {
178            EntryMode::DIR
179        } else {
180            EntryMode::Unknown
181        };
182        let meta = Metadata::new(mode)
183            .with_content_length(file_meta.len())
184            .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
185        Ok(meta)
186    }
187
188    async fn abort(&self) -> Result<()> {
189        if let Some(temp_path) = &self.temp_path {
190            tokio::fs::remove_file(temp_path)
191                .await
192                .map_err(new_std_io_error)
193        } else {
194            Err(Error::new(
195                ErrorKind::Unsupported,
196                "Fs doesn't support abort if atomic_write_dir is not set",
197            ))
198        }
199    }
200}
201
202#[cfg(windows)]
203fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
204    use std::os::windows::fs::FileExt;
205    f.seek_write(buf, offset).map_err(new_std_io_error)
206}
207
208#[cfg(unix)]
209fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
210    use std::os::unix::fs::FileExt;
211    f.write_at(buf, offset).map_err(new_std_io_error)
212}