opendal/services/monoiofs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use futures::SinkExt;
24use futures::StreamExt;
25use futures::channel::mpsc;
26use futures::channel::oneshot;
27use monoio::fs::OpenOptions;
28
29use super::core::MonoiofsCore;
30use crate::raw::*;
31use crate::*;
32
33enum WriterRequest {
34    Write {
35        pos: u64,
36        buf: Bytes,
37        tx: oneshot::Sender<Result<()>>,
38    },
39    Stat {
40        tx: oneshot::Sender<Result<monoio::fs::Metadata>>,
41    },
42    Close {
43        tx: oneshot::Sender<Result<()>>,
44    },
45}
46
47pub struct MonoiofsWriter {
48    core: Arc<MonoiofsCore>,
49    tx: mpsc::UnboundedSender<WriterRequest>,
50    pos: u64,
51}
52
53impl MonoiofsWriter {
54    pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
55        let (open_result_tx, open_result_rx) = oneshot::channel();
56        let (tx, rx) = mpsc::unbounded();
57        core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
58            .await;
59        core.unwrap(open_result_rx.await)?;
60        Ok(Self { core, tx, pos: 0 })
61    }
62
63    /// entrypoint of worker task that runs in context of monoio
64    async fn worker_entrypoint(
65        path: PathBuf,
66        append: bool,
67        mut rx: mpsc::UnboundedReceiver<WriterRequest>,
68        open_result_tx: oneshot::Sender<Result<()>>,
69    ) {
70        let result = OpenOptions::new()
71            .write(true)
72            .create(true)
73            .append(append)
74            .truncate(!append)
75            .open(path)
76            .await;
77        // [`monoio::fs::File`] is non-Send, hence it is kept within
78        // worker thread
79        let file = match result {
80            Ok(file) => {
81                let Ok(()) = open_result_tx.send(Ok(())) else {
82                    // MonoiofsWriter::new is cancelled, exit worker task
83                    return;
84                };
85                file
86            }
87            Err(e) => {
88                // discard the result if send failed due to MonoiofsWriter::new
89                // cancelled since we are going to exit anyway
90                let _ = open_result_tx.send(Err(new_std_io_error(e)));
91                return;
92            }
93        };
94        // wait for write or close request and send back result to main thread
95        loop {
96            let Some(req) = rx.next().await else {
97                // MonoiofsWriter is dropped, exit worker task
98                break;
99            };
100            match req {
101                WriterRequest::Write { pos, buf, tx } => {
102                    let (result, _) = file.write_all_at(buf, pos).await;
103                    // discard the result if send failed due to
104                    // MonoiofsWriter::write cancelled
105                    let _ = tx.send(result.map_err(new_std_io_error));
106                }
107                WriterRequest::Stat { tx } => {
108                    let result = file.metadata().await;
109                    let _ = tx.send(result.map_err(new_std_io_error));
110                }
111                WriterRequest::Close { tx } => {
112                    let result = file.sync_all().await;
113                    // discard the result if send failed due to
114                    // MonoiofsWriter::close cancelled
115                    let _ = tx.send(result.map_err(new_std_io_error));
116                    // file is closed in background and result is useless
117                    let _ = file.close().await;
118                    break;
119                }
120            }
121        }
122    }
123}
124
125impl oio::Write for MonoiofsWriter {
126    /// Send write request to worker thread and wait for result. Actual
127    /// write happens in [`MonoiofsWriter::worker_entrypoint`] running
128    /// on worker thread.
129    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
130        while bs.has_remaining() {
131            let buf = bs.current();
132            let n = buf.len();
133            let (tx, rx) = oneshot::channel();
134            self.core.unwrap(
135                self.tx
136                    .send(WriterRequest::Write {
137                        pos: self.pos,
138                        buf,
139                        tx,
140                    })
141                    .await,
142            );
143            self.core.unwrap(rx.await)?;
144            self.pos += n as u64;
145            bs.advance(n);
146        }
147        Ok(())
148    }
149
150    /// Send close request to worker thread and wait for result. Actual
151    /// close happens in [`MonoiofsWriter::worker_entrypoint`] running
152    /// on worker thread.
153    async fn close(&mut self) -> Result<Metadata> {
154        let (tx, rx) = oneshot::channel();
155        self.core
156            .unwrap(self.tx.send(WriterRequest::Stat { tx }).await);
157        let file_meta = self.core.unwrap(rx.await)?;
158
159        let (tx, rx) = oneshot::channel();
160        self.core
161            .unwrap(self.tx.send(WriterRequest::Close { tx }).await);
162        self.core.unwrap(rx.await)?;
163
164        let mode = if file_meta.is_dir() {
165            EntryMode::DIR
166        } else if file_meta.is_file() {
167            EntryMode::FILE
168        } else {
169            EntryMode::Unknown
170        };
171        let meta = Metadata::new(mode)
172            .with_content_length(file_meta.len())
173            .with_last_modified(Timestamp::try_from(
174                file_meta.modified().map_err(new_std_io_error)?,
175            )?);
176        Ok(meta)
177    }
178
179    async fn abort(&mut self) -> Result<()> {
180        Err(Error::new(
181            ErrorKind::Unsupported,
182            "Monoiofs doesn't support abort",
183        ))
184    }
185}