opendal/services/fs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fs::File;
19use std::io::Write;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use bytes::Buf;
24use tokio::io::AsyncWriteExt;
25
26use crate::raw::*;
27use crate::services::fs::core::FsCore;
28use crate::*;
29
30pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;
31
32pub struct FsWriter {
33    target_path: PathBuf,
34    /// The temp_path is used to specify whether we should move to target_path after the file has been closed.
35    temp_path: Option<PathBuf>,
36    f: tokio::fs::File,
37}
38
39impl FsWriter {
40    pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) -> Result<Self> {
41        let target_path = core.ensure_write_abs_path(&core.root, path).await?;
42
43        // Quick path while atomic_write_dir is not set.
44        if core.atomic_write_dir.is_none() {
45            let target_file = core.fs_write(&target_path, &op).await?;
46
47            return Ok(Self {
48                target_path,
49                temp_path: None,
50                f: target_file,
51            });
52        }
53
54        let is_append = op.append();
55        let is_exist = tokio::fs::try_exists(&target_path)
56            .await
57            .map_err(new_std_io_error)?;
58        if op.if_not_exists() && is_exist {
59            return Err(Error::new(
60                ErrorKind::ConditionNotMatch,
61                "file already exists, doesn't match the condition if_not_exists",
62            ));
63        }
64
65        // The only case we allow write in place is the file
66        // exists and users request for append writing.
67        let (f, temp_path) = if !(is_append && is_exist) {
68            core.fs_tempfile_write(path).await?
69        } else {
70            let f = core.fs_write(&target_path, &op).await?;
71            (f, None)
72        };
73
74        Ok(Self {
75            target_path,
76            temp_path,
77            f,
78        })
79    }
80}
81
82/// # Safety
83///
84/// We will only take `&mut Self` reference for FsWriter.
85unsafe impl Sync for FsWriter {}
86
87impl oio::Write for FsWriter {
88    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
89        while bs.has_remaining() {
90            let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
91            bs.advance(n);
92        }
93
94        Ok(())
95    }
96
97    async fn close(&mut self) -> Result<Metadata> {
98        self.f.flush().await.map_err(new_std_io_error)?;
99        self.f.sync_all().await.map_err(new_std_io_error)?;
100
101        if let Some(temp_path) = &self.temp_path {
102            tokio::fs::rename(temp_path, &self.target_path)
103                .await
104                .map_err(new_std_io_error)?;
105        }
106
107        let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
108        let meta = Metadata::new(EntryMode::FILE)
109            .with_content_length(file_meta.len())
110            .with_last_modified(Timestamp::try_from(
111                file_meta.modified().map_err(new_std_io_error)?,
112            )?);
113        Ok(meta)
114    }
115
116    async fn abort(&mut self) -> Result<()> {
117        if let Some(temp_path) = &self.temp_path {
118            tokio::fs::remove_file(temp_path)
119                .await
120                .map_err(new_std_io_error)
121        } else {
122            Err(Error::new(
123                ErrorKind::Unsupported,
124                "Fs doesn't support abort if atomic_write_dir is not set",
125            ))
126        }
127    }
128}
129
130impl oio::PositionWrite for FsWriter {
131    async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
132        let f = self
133            .f
134            .try_clone()
135            .await
136            .map_err(new_std_io_error)?
137            .into_std()
138            .await;
139
140        tokio::task::spawn_blocking(move || {
141            let mut buf = buf;
142            let mut offset = offset;
143            while !buf.is_empty() {
144                match write_at(&f, buf.chunk(), offset) {
145                    Ok(n) => {
146                        buf.advance(n);
147                        offset += n as u64
148                    }
149                    Err(e) => return Err(e),
150                }
151            }
152            Ok(())
153        })
154        .await
155        .map_err(new_task_join_error)?
156    }
157
158    async fn close(&self) -> Result<Metadata> {
159        let mut f = self
160            .f
161            .try_clone()
162            .await
163            .map_err(new_std_io_error)?
164            .into_std()
165            .await;
166
167        f.flush().map_err(new_std_io_error)?;
168        f.sync_all().map_err(new_std_io_error)?;
169
170        if let Some(temp_path) = &self.temp_path {
171            tokio::fs::rename(temp_path, &self.target_path)
172                .await
173                .map_err(new_std_io_error)?;
174        }
175
176        let file_meta = f.metadata().map_err(new_std_io_error)?;
177        let mode = if file_meta.is_file() {
178            EntryMode::FILE
179        } else if file_meta.is_dir() {
180            EntryMode::DIR
181        } else {
182            EntryMode::Unknown
183        };
184        let meta = Metadata::new(mode)
185            .with_content_length(file_meta.len())
186            .with_last_modified(Timestamp::try_from(
187                file_meta.modified().map_err(new_std_io_error)?,
188            )?);
189        Ok(meta)
190    }
191
192    async fn abort(&self) -> Result<()> {
193        if let Some(temp_path) = &self.temp_path {
194            tokio::fs::remove_file(temp_path)
195                .await
196                .map_err(new_std_io_error)
197        } else {
198            Err(Error::new(
199                ErrorKind::Unsupported,
200                "Fs doesn't support abort if atomic_write_dir is not set",
201            ))
202        }
203    }
204}
205
206#[cfg(windows)]
207fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
208    use std::os::windows::fs::FileExt;
209    f.seek_write(buf, offset).map_err(new_std_io_error)
210}
211
212#[cfg(unix)]
213fn write_at(f: &File, buf: &[u8], offset: u64) -> Result<usize> {
214    use std::os::unix::fs::FileExt;
215    f.write_at(buf, offset).map_err(new_std_io_error)
216}