opendal/services/monoiofs/
writer.rs1use std::path::PathBuf;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use futures::SinkExt;
24use futures::StreamExt;
25use futures::channel::mpsc;
26use futures::channel::oneshot;
27use monoio::fs::OpenOptions;
28
29use super::core::MonoiofsCore;
30use crate::raw::*;
31use crate::*;
32
33enum WriterRequest {
34 Write {
35 pos: u64,
36 buf: Bytes,
37 tx: oneshot::Sender<Result<()>>,
38 },
39 Stat {
40 tx: oneshot::Sender<Result<monoio::fs::Metadata>>,
41 },
42 Close {
43 tx: oneshot::Sender<Result<()>>,
44 },
45}
46
47pub struct MonoiofsWriter {
48 core: Arc<MonoiofsCore>,
49 tx: mpsc::UnboundedSender<WriterRequest>,
50 pos: u64,
51}
52
53impl MonoiofsWriter {
54 pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
55 let (open_result_tx, open_result_rx) = oneshot::channel();
56 let (tx, rx) = mpsc::unbounded();
57 core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
58 .await;
59 core.unwrap(open_result_rx.await)?;
60 Ok(Self { core, tx, pos: 0 })
61 }
62
63 async fn worker_entrypoint(
65 path: PathBuf,
66 append: bool,
67 mut rx: mpsc::UnboundedReceiver<WriterRequest>,
68 open_result_tx: oneshot::Sender<Result<()>>,
69 ) {
70 let result = OpenOptions::new()
71 .write(true)
72 .create(true)
73 .append(append)
74 .truncate(!append)
75 .open(path)
76 .await;
77 let file = match result {
80 Ok(file) => {
81 let Ok(()) = open_result_tx.send(Ok(())) else {
82 return;
84 };
85 file
86 }
87 Err(e) => {
88 let _ = open_result_tx.send(Err(new_std_io_error(e)));
91 return;
92 }
93 };
94 loop {
96 let Some(req) = rx.next().await else {
97 break;
99 };
100 match req {
101 WriterRequest::Write { pos, buf, tx } => {
102 let (result, _) = file.write_all_at(buf, pos).await;
103 let _ = tx.send(result.map_err(new_std_io_error));
106 }
107 WriterRequest::Stat { tx } => {
108 let result = file.metadata().await;
109 let _ = tx.send(result.map_err(new_std_io_error));
110 }
111 WriterRequest::Close { tx } => {
112 let result = file.sync_all().await;
113 let _ = tx.send(result.map_err(new_std_io_error));
116 let _ = file.close().await;
118 break;
119 }
120 }
121 }
122 }
123}
124
125impl oio::Write for MonoiofsWriter {
126 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
130 while bs.has_remaining() {
131 let buf = bs.current();
132 let n = buf.len();
133 let (tx, rx) = oneshot::channel();
134 self.core.unwrap(
135 self.tx
136 .send(WriterRequest::Write {
137 pos: self.pos,
138 buf,
139 tx,
140 })
141 .await,
142 );
143 self.core.unwrap(rx.await)?;
144 self.pos += n as u64;
145 bs.advance(n);
146 }
147 Ok(())
148 }
149
150 async fn close(&mut self) -> Result<Metadata> {
154 let (tx, rx) = oneshot::channel();
155 self.core
156 .unwrap(self.tx.send(WriterRequest::Stat { tx }).await);
157 let file_meta = self.core.unwrap(rx.await)?;
158
159 let (tx, rx) = oneshot::channel();
160 self.core
161 .unwrap(self.tx.send(WriterRequest::Close { tx }).await);
162 self.core.unwrap(rx.await)?;
163
164 let mode = if file_meta.is_dir() {
165 EntryMode::DIR
166 } else if file_meta.is_file() {
167 EntryMode::FILE
168 } else {
169 EntryMode::Unknown
170 };
171 let meta = Metadata::new(mode)
172 .with_content_length(file_meta.len())
173 .with_last_modified(Timestamp::try_from(
174 file_meta.modified().map_err(new_std_io_error)?,
175 )?);
176 Ok(meta)
177 }
178
179 async fn abort(&mut self) -> Result<()> {
180 Err(Error::new(
181 ErrorKind::Unsupported,
182 "Monoiofs doesn't support abort",
183 ))
184 }
185}