opendal/services/compfs/
reader.rs1use std::sync::Arc;
19
20use compio::buf::buf_try;
21use compio::buf::IntoInner;
22use compio::buf::IoBuf;
23use compio::io::AsyncReadAt;
24
25use super::core::CompfsCore;
26use crate::raw::*;
27use crate::*;
28
29#[derive(Debug)]
30pub struct CompfsReader {
31 core: Arc<CompfsCore>,
32 file: compio::fs::File,
33 offset: u64,
34 end: Option<u64>,
35}
36
37impl CompfsReader {
38 pub(super) fn new(core: Arc<CompfsCore>, file: compio::fs::File, range: BytesRange) -> Self {
39 Self {
40 core,
41 file,
42 offset: range.offset(),
43 end: range.size().map(|v| v + range.offset()),
44 }
45 }
46}
47
48impl oio::Read for CompfsReader {
49 async fn read(&mut self) -> Result<Buffer> {
50 let pos = self.offset;
51 if let Some(end) = self.end {
52 if end <= pos {
53 return Ok(Buffer::new());
54 }
55 }
56
57 let mut bs = self.core.buf_pool.get();
58 let max_len = if let Some(end) = self.end {
60 (end - pos) as usize
61 } else {
62 64 * 1024
63 };
64 bs.reserve(max_len);
65 let f = self.file.clone();
66 let (n, mut bs) = self
67 .core
68 .exec(move || async move {
69 let (n, bs) = buf_try!(@try f.read_at(bs.slice(..max_len), pos).await);
71 Ok((n, bs.into_inner()))
72 })
73 .await?;
74 let frozen = bs.split_to(n).freeze();
75 self.offset += frozen.len() as u64;
76 self.core.buf_pool.put(bs);
77 Ok(Buffer::from(frozen))
78 }
79}