opendal/services/monoiofs/reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use bytes::BytesMut;
22use futures::channel::mpsc;
23use futures::channel::oneshot;
24use futures::SinkExt;
25use futures::StreamExt;
26use monoio::fs::OpenOptions;
27
28use super::core::MonoiofsCore;
29use super::core::BUFFER_SIZE;
30use crate::raw::*;
31use crate::*;
32
33enum ReaderRequest {
34 Read {
35 pos: u64,
36 buf: BytesMut,
37 tx: oneshot::Sender<Result<BytesMut>>,
38 },
39}
40
41pub struct MonoiofsReader {
42 core: Arc<MonoiofsCore>,
43 tx: mpsc::UnboundedSender<ReaderRequest>,
44 pos: u64,
45 end_pos: Option<u64>,
46}
47
48impl MonoiofsReader {
49 pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, range: BytesRange) -> Result<Self> {
50 let (open_result_tx, open_result_rx) = oneshot::channel();
51 let (tx, rx) = mpsc::unbounded();
52 core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx))
53 .await;
54 core.unwrap(open_result_rx.await)?;
55 Ok(Self {
56 core,
57 tx,
58 pos: range.offset(),
59 end_pos: range.size().map(|size| range.offset() + size),
60 })
61 }
62
63 /// entrypoint of worker task that runs in context of monoio
64 async fn worker_entrypoint(
65 path: PathBuf,
66 mut rx: mpsc::UnboundedReceiver<ReaderRequest>,
67 open_result_tx: oneshot::Sender<Result<()>>,
68 ) {
69 let result = OpenOptions::new().read(true).open(path).await;
70 // [`monoio::fs::File`] is non-Send, hence it is kept within
71 // worker thread
72 let file = match result {
73 Ok(file) => {
74 let Ok(()) = open_result_tx.send(Ok(())) else {
75 // MonoiofsReader::new is cancelled, exit worker task
76 return;
77 };
78 file
79 }
80 Err(e) => {
81 // discard the result if send failed due to MonoiofsReader::new
82 // cancelled since we are going to exit anyway
83 let _ = open_result_tx.send(Err(new_std_io_error(e)));
84 return;
85 }
86 };
87 // wait for read request and send back result to main thread
88 loop {
89 let Some(req) = rx.next().await else {
90 // MonoiofsReader is dropped, exit worker task
91 break;
92 };
93 match req {
94 ReaderRequest::Read { pos, buf, tx } => {
95 let (result, buf) = file.read_at(buf, pos).await;
96 // buf.len() will be set to n by monoio if read
97 // successfully, so n is dropped
98 let result = result.map(move |_| buf).map_err(new_std_io_error);
99 // discard the result if send failed due to
100 // MonoiofsReader::read cancelled
101 let _ = tx.send(result);
102 }
103 }
104 }
105 }
106}
107
108impl oio::Read for MonoiofsReader {
109 /// Send read request to worker thread and wait for result. Actual
110 /// read happens in [`MonoiofsReader::worker_entrypoint`] running
111 /// on worker thread.
112 async fn read(&mut self) -> Result<Buffer> {
113 if let Some(end_pos) = self.end_pos {
114 if self.pos >= end_pos {
115 return Ok(Buffer::new());
116 }
117 }
118
119 // allocate and resize buffer
120 let mut buf = self.core.buf_pool.get();
121 let size = self
122 .end_pos
123 .map_or(BUFFER_SIZE, |end_pos| (end_pos - self.pos) as usize);
124 // set capacity of buf to exact size to avoid excessive read
125 buf.reserve(size);
126 let _ = buf.split_off(size);
127
128 // send read request to worker thread and wait for result
129 let (tx, rx) = oneshot::channel();
130 self.core.unwrap(
131 self.tx
132 .send(ReaderRequest::Read {
133 pos: self.pos,
134 buf,
135 tx,
136 })
137 .await,
138 );
139 let mut buf = self.core.unwrap(rx.await)?;
140
141 // advance cursor if read successfully
142 self.pos += buf.len() as u64;
143 let buffer = Buffer::from(buf.split().freeze());
144 self.core.buf_pool.put(buf);
145 Ok(buffer)
146 }
147}