opendal/services/monoiofs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use chrono::DateTime;
24use futures::channel::mpsc;
25use futures::channel::oneshot;
26use futures::SinkExt;
27use futures::StreamExt;
28use monoio::fs::OpenOptions;
29
30use super::core::MonoiofsCore;
31use crate::raw::*;
32use crate::*;
33
34enum WriterRequest {
35    Write {
36        pos: u64,
37        buf: Bytes,
38        tx: oneshot::Sender<Result<()>>,
39    },
40    Stat {
41        tx: oneshot::Sender<Result<monoio::fs::Metadata>>,
42    },
43    Close {
44        tx: oneshot::Sender<Result<()>>,
45    },
46}
47
48pub struct MonoiofsWriter {
49    core: Arc<MonoiofsCore>,
50    tx: mpsc::UnboundedSender<WriterRequest>,
51    pos: u64,
52}
53
54impl MonoiofsWriter {
55    pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
56        let (open_result_tx, open_result_rx) = oneshot::channel();
57        let (tx, rx) = mpsc::unbounded();
58        core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
59            .await;
60        core.unwrap(open_result_rx.await)?;
61        Ok(Self { core, tx, pos: 0 })
62    }
63
64    /// entrypoint of worker task that runs in context of monoio
65    async fn worker_entrypoint(
66        path: PathBuf,
67        append: bool,
68        mut rx: mpsc::UnboundedReceiver<WriterRequest>,
69        open_result_tx: oneshot::Sender<Result<()>>,
70    ) {
71        let result = OpenOptions::new()
72            .write(true)
73            .create(true)
74            .append(append)
75            .truncate(!append)
76            .open(path)
77            .await;
78        // [`monoio::fs::File`] is non-Send, hence it is kept within
79        // worker thread
80        let file = match result {
81            Ok(file) => {
82                let Ok(()) = open_result_tx.send(Ok(())) else {
83                    // MonoiofsWriter::new is cancelled, exit worker task
84                    return;
85                };
86                file
87            }
88            Err(e) => {
89                // discard the result if send failed due to MonoiofsWriter::new
90                // cancelled since we are going to exit anyway
91                let _ = open_result_tx.send(Err(new_std_io_error(e)));
92                return;
93            }
94        };
95        // wait for write or close request and send back result to main thread
96        loop {
97            let Some(req) = rx.next().await else {
98                // MonoiofsWriter is dropped, exit worker task
99                break;
100            };
101            match req {
102                WriterRequest::Write { pos, buf, tx } => {
103                    let (result, _) = file.write_all_at(buf, pos).await;
104                    // discard the result if send failed due to
105                    // MonoiofsWriter::write cancelled
106                    let _ = tx.send(result.map_err(new_std_io_error));
107                }
108                WriterRequest::Stat { tx } => {
109                    let result = file.metadata().await;
110                    let _ = tx.send(result.map_err(new_std_io_error));
111                }
112                WriterRequest::Close { tx } => {
113                    let result = file.sync_all().await;
114                    // discard the result if send failed due to
115                    // MonoiofsWriter::close cancelled
116                    let _ = tx.send(result.map_err(new_std_io_error));
117                    // file is closed in background and result is useless
118                    let _ = file.close().await;
119                    break;
120                }
121            }
122        }
123    }
124}
125
126impl oio::Write for MonoiofsWriter {
127    /// Send write request to worker thread and wait for result. Actual
128    /// write happens in [`MonoiofsWriter::worker_entrypoint`] running
129    /// on worker thread.
130    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
131        while bs.has_remaining() {
132            let buf = bs.current();
133            let n = buf.len();
134            let (tx, rx) = oneshot::channel();
135            self.core.unwrap(
136                self.tx
137                    .send(WriterRequest::Write {
138                        pos: self.pos,
139                        buf,
140                        tx,
141                    })
142                    .await,
143            );
144            self.core.unwrap(rx.await)?;
145            self.pos += n as u64;
146            bs.advance(n);
147        }
148        Ok(())
149    }
150
151    /// Send close request to worker thread and wait for result. Actual
152    /// close happens in [`MonoiofsWriter::worker_entrypoint`] running
153    /// on worker thread.
154    async fn close(&mut self) -> Result<Metadata> {
155        let (tx, rx) = oneshot::channel();
156        self.core
157            .unwrap(self.tx.send(WriterRequest::Stat { tx }).await);
158        let file_meta = self.core.unwrap(rx.await)?;
159
160        let (tx, rx) = oneshot::channel();
161        self.core
162            .unwrap(self.tx.send(WriterRequest::Close { tx }).await);
163        self.core.unwrap(rx.await)?;
164
165        let mode = if file_meta.is_dir() {
166            EntryMode::DIR
167        } else if file_meta.is_file() {
168            EntryMode::FILE
169        } else {
170            EntryMode::Unknown
171        };
172        let meta = Metadata::new(mode)
173            .with_content_length(file_meta.len())
174            .with_last_modified(
175                file_meta
176                    .modified()
177                    .map(DateTime::from)
178                    .map_err(new_std_io_error)?,
179            );
180        Ok(meta)
181    }
182
183    async fn abort(&mut self) -> Result<()> {
184        Err(Error::new(
185            ErrorKind::Unsupported,
186            "Monoiofs doesn't support abort",
187        ))
188    }
189}