opendal/services/fs/
reader.rs1use std::io::Read;
19use std::sync::Arc;
20
21use tokio::io::AsyncReadExt;
22use tokio::io::ReadBuf;
23
24use super::core::*;
25use crate::raw::*;
26use crate::*;
27
28pub struct FsReader<F> {
29 core: Arc<FsCore>,
30 f: F,
31 read: usize,
32 size: usize,
33 buf_size: usize,
34}
35
36impl<F> FsReader<F> {
37 pub fn new(core: Arc<FsCore>, f: F, size: usize) -> Self {
38 Self {
39 core,
40 f,
41 read: 0,
42 size,
43 buf_size: 2 * 1024 * 1024,
45 }
46 }
47}
48
49impl oio::Read for FsReader<tokio::fs::File> {
50 async fn read(&mut self) -> Result<Buffer> {
51 if self.read >= self.size {
52 return Ok(Buffer::new());
53 }
54
55 let mut bs = self.core.buf_pool.get();
56 bs.reserve(self.buf_size);
57
58 let size = (self.size - self.read).min(self.buf_size);
59 let buf = &mut bs.spare_capacity_mut()[..size];
60 let mut read_buf: ReadBuf = ReadBuf::uninit(buf);
61
62 unsafe {
64 read_buf.assume_init(size);
65 }
66
67 let n = self
68 .f
69 .read_buf(&mut read_buf)
70 .await
71 .map_err(new_std_io_error)?;
72 self.read += n;
73
74 let filled = read_buf.filled().len();
76 unsafe { bs.set_len(filled) }
77
78 let frozen = bs.split().freeze();
79 self.core.buf_pool.put(bs);
81
82 Ok(Buffer::from(frozen))
83 }
84}
85
86impl oio::BlockingRead for FsReader<std::fs::File> {
87 fn read(&mut self) -> Result<Buffer> {
88 if self.read >= self.size {
89 return Ok(Buffer::new());
90 }
91
92 let mut bs = self.core.buf_pool.get();
93 bs.reserve(self.buf_size);
94
95 let size = (self.size - self.read).min(self.buf_size);
96 let buf = &mut bs.spare_capacity_mut()[..size];
97 let mut read_buf: ReadBuf = ReadBuf::uninit(buf);
98
99 unsafe {
101 read_buf.assume_init(size);
102 }
103
104 let n = self
105 .f
106 .read(read_buf.initialize_unfilled())
107 .map_err(new_std_io_error)?;
108 read_buf.advance(n);
109 self.read += n;
110
111 let filled = read_buf.filled().len();
113 unsafe { bs.set_len(filled) }
114
115 let frozen = bs.split().freeze();
116 self.core.buf_pool.put(bs);
118
119 Ok(Buffer::from(frozen))
120 }
121}