opendal/services/monoiofs/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use bytes::BytesMut;
22use futures::channel::mpsc;
23use futures::channel::oneshot;
24use futures::SinkExt;
25use futures::StreamExt;
26use monoio::fs::OpenOptions;
27
28use super::core::MonoiofsCore;
29use super::core::BUFFER_SIZE;
30use crate::raw::*;
31use crate::*;
32
33enum ReaderRequest {
34    Read {
35        pos: u64,
36        buf: BytesMut,
37        tx: oneshot::Sender<Result<BytesMut>>,
38    },
39}
40
41pub struct MonoiofsReader {
42    core: Arc<MonoiofsCore>,
43    tx: mpsc::UnboundedSender<ReaderRequest>,
44    pos: u64,
45    end_pos: Option<u64>,
46}
47
48impl MonoiofsReader {
49    pub async fn new(core: Arc<MonoiofsCore>, path: PathBuf, range: BytesRange) -> Result<Self> {
50        let (open_result_tx, open_result_rx) = oneshot::channel();
51        let (tx, rx) = mpsc::unbounded();
52        core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx))
53            .await;
54        core.unwrap(open_result_rx.await)?;
55        Ok(Self {
56            core,
57            tx,
58            pos: range.offset(),
59            end_pos: range.size().map(|size| range.offset() + size),
60        })
61    }
62
63    /// entrypoint of worker task that runs in context of monoio
64    async fn worker_entrypoint(
65        path: PathBuf,
66        mut rx: mpsc::UnboundedReceiver<ReaderRequest>,
67        open_result_tx: oneshot::Sender<Result<()>>,
68    ) {
69        let result = OpenOptions::new().read(true).open(path).await;
70        // [`monoio::fs::File`] is non-Send, hence it is kept within
71        // worker thread
72        let file = match result {
73            Ok(file) => {
74                let Ok(()) = open_result_tx.send(Ok(())) else {
75                    // MonoiofsReader::new is cancelled, exit worker task
76                    return;
77                };
78                file
79            }
80            Err(e) => {
81                // discard the result if send failed due to MonoiofsReader::new
82                // cancelled since we are going to exit anyway
83                let _ = open_result_tx.send(Err(new_std_io_error(e)));
84                return;
85            }
86        };
87        // wait for read request and send back result to main thread
88        loop {
89            let Some(req) = rx.next().await else {
90                // MonoiofsReader is dropped, exit worker task
91                break;
92            };
93            match req {
94                ReaderRequest::Read { pos, buf, tx } => {
95                    let (result, buf) = file.read_at(buf, pos).await;
96                    // buf.len() will be set to n by monoio if read
97                    // successfully, so n is dropped
98                    let result = result.map(move |_| buf).map_err(new_std_io_error);
99                    // discard the result if send failed due to
100                    // MonoiofsReader::read cancelled
101                    let _ = tx.send(result);
102                }
103            }
104        }
105    }
106}
107
108impl oio::Read for MonoiofsReader {
109    /// Send read request to worker thread and wait for result. Actual
110    /// read happens in [`MonoiofsReader::worker_entrypoint`] running
111    /// on worker thread.
112    async fn read(&mut self) -> Result<Buffer> {
113        if let Some(end_pos) = self.end_pos {
114            if self.pos >= end_pos {
115                return Ok(Buffer::new());
116            }
117        }
118
119        // allocate and resize buffer
120        let mut buf = self.core.buf_pool.get();
121        let size = self
122            .end_pos
123            .map_or(BUFFER_SIZE, |end_pos| (end_pos - self.pos) as usize);
124        // set capacity of buf to exact size to avoid excessive read
125        buf.reserve(size);
126        let _ = buf.split_off(size);
127
128        // send read request to worker thread and wait for result
129        let (tx, rx) = oneshot::channel();
130        self.core.unwrap(
131            self.tx
132                .send(ReaderRequest::Read {
133                    pos: self.pos,
134                    buf,
135                    tx,
136                })
137                .await,
138        );
139        let mut buf = self.core.unwrap(rx.await)?;
140
141        // advance cursor if read successfully
142        self.pos += buf.len() as u64;
143        let buffer = Buffer::from(buf.split().freeze());
144        self.core.buf_pool.put(buf);
145        Ok(buffer)
146    }
147}