opendal/services/monoiofs/
writer.rs1use std::path::PathBuf;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use chrono::DateTime;
24use futures::channel::mpsc;
25use futures::channel::oneshot;
26use futures::SinkExt;
27use futures::StreamExt;
28use monoio::fs::OpenOptions;
29
30use super::core::MonoiofsCore;
31use crate::raw::*;
32use crate::*;
33
34enum WriterRequest {
35 Write {
36 pos: u64,
37 buf: Bytes,
38 tx: oneshot::Sender<Result<()>>,
39 },
40 Stat {
41 tx: oneshot::Sender<Result<monoio::fs::Metadata>>,
42 },
43 Close {
44 tx: oneshot::Sender<Result<()>>,
45 },
46}
47
48pub struct MonoiofsWriter {
49 core: Arc<MonoiofsCore>,
50 tx: mpsc::UnboundedSender<WriterRequest>,
51 pos: u64,
52}
53
54impl MonoiofsWriter {
55 pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, append: bool) -> Result<Self> {
56 let (open_result_tx, open_result_rx) = oneshot::channel();
57 let (tx, rx) = mpsc::unbounded();
58 core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx))
59 .await;
60 core.unwrap(open_result_rx.await)?;
61 Ok(Self { core, tx, pos: 0 })
62 }
63
64 async fn worker_entrypoint(
66 path: PathBuf,
67 append: bool,
68 mut rx: mpsc::UnboundedReceiver<WriterRequest>,
69 open_result_tx: oneshot::Sender<Result<()>>,
70 ) {
71 let result = OpenOptions::new()
72 .write(true)
73 .create(true)
74 .append(append)
75 .truncate(!append)
76 .open(path)
77 .await;
78 let file = match result {
81 Ok(file) => {
82 let Ok(()) = open_result_tx.send(Ok(())) else {
83 return;
85 };
86 file
87 }
88 Err(e) => {
89 let _ = open_result_tx.send(Err(new_std_io_error(e)));
92 return;
93 }
94 };
95 loop {
97 let Some(req) = rx.next().await else {
98 break;
100 };
101 match req {
102 WriterRequest::Write { pos, buf, tx } => {
103 let (result, _) = file.write_all_at(buf, pos).await;
104 let _ = tx.send(result.map_err(new_std_io_error));
107 }
108 WriterRequest::Stat { tx } => {
109 let result = file.metadata().await;
110 let _ = tx.send(result.map_err(new_std_io_error));
111 }
112 WriterRequest::Close { tx } => {
113 let result = file.sync_all().await;
114 let _ = tx.send(result.map_err(new_std_io_error));
117 let _ = file.close().await;
119 break;
120 }
121 }
122 }
123 }
124}
125
126impl oio::Write for MonoiofsWriter {
127 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
131 while bs.has_remaining() {
132 let buf = bs.current();
133 let n = buf.len();
134 let (tx, rx) = oneshot::channel();
135 self.core.unwrap(
136 self.tx
137 .send(WriterRequest::Write {
138 pos: self.pos,
139 buf,
140 tx,
141 })
142 .await,
143 );
144 self.core.unwrap(rx.await)?;
145 self.pos += n as u64;
146 bs.advance(n);
147 }
148 Ok(())
149 }
150
151 async fn close(&mut self) -> Result<Metadata> {
155 let (tx, rx) = oneshot::channel();
156 self.core
157 .unwrap(self.tx.send(WriterRequest::Stat { tx }).await);
158 let file_meta = self.core.unwrap(rx.await)?;
159
160 let (tx, rx) = oneshot::channel();
161 self.core
162 .unwrap(self.tx.send(WriterRequest::Close { tx }).await);
163 self.core.unwrap(rx.await)?;
164
165 let mode = if file_meta.is_dir() {
166 EntryMode::DIR
167 } else if file_meta.is_file() {
168 EntryMode::FILE
169 } else {
170 EntryMode::Unknown
171 };
172 let meta = Metadata::new(mode)
173 .with_content_length(file_meta.len())
174 .with_last_modified(
175 file_meta
176 .modified()
177 .map(DateTime::from)
178 .map_err(new_std_io_error)?,
179 );
180 Ok(meta)
181 }
182
183 async fn abort(&mut self) -> Result<()> {
184 Err(Error::new(
185 ErrorKind::Unsupported,
186 "Monoiofs doesn't support abort",
187 ))
188 }
189}