opendal/types/context/
read.rs1use std::ops::{Bound, Range, RangeBounds};
19use std::sync::Arc;
20
21use crate::raw::*;
22use crate::*;
23
24pub struct ReadContext {
26 acc: Accessor,
28 path: String,
30 args: OpRead,
32 options: OpReader,
34}
35
36impl ReadContext {
37 #[inline]
39 pub fn new(acc: Accessor, path: String, args: OpRead, options: OpReader) -> Self {
40 Self {
41 acc,
42 path,
43 args,
44 options,
45 }
46 }
47
48 #[inline]
50 pub fn accessor(&self) -> &Accessor {
51 &self.acc
52 }
53
54 #[inline]
56 pub fn path(&self) -> &str {
57 &self.path
58 }
59
60 #[inline]
62 pub fn args(&self) -> &OpRead {
63 &self.args
64 }
65
66 #[inline]
68 pub fn options(&self) -> &OpReader {
69 &self.options
70 }
71
72 pub(crate) async fn parse_into_range(
74 &self,
75 range: impl RangeBounds<u64>,
76 ) -> Result<Range<u64>> {
77 let start = match range.start_bound() {
78 Bound::Included(v) => *v,
79 Bound::Excluded(v) => v + 1,
80 Bound::Unbounded => 0,
81 };
82
83 let end = match range.end_bound() {
84 Bound::Included(v) => v + 1,
85 Bound::Excluded(v) => *v,
86 Bound::Unbounded => {
87 let mut op_stat = OpStat::new();
88
89 if let Some(v) = self.args().version() {
90 op_stat = op_stat.with_version(v);
91 }
92
93 self.accessor()
94 .stat(self.path(), op_stat)
95 .await?
96 .into_metadata()
97 .content_length()
98 }
99 };
100
101 Ok(start..end)
102 }
103}
104
105pub struct ReadGenerator {
115 ctx: Arc<ReadContext>,
116
117 offset: u64,
118 size: Option<u64>,
119}
120
121impl ReadGenerator {
122 #[inline]
124 pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
125 Self { ctx, offset, size }
126 }
127
128 fn next_range(&mut self) -> Option<BytesRange> {
130 if self.size == Some(0) {
131 return None;
132 }
133
134 let next_offset = self.offset;
135 let next_size = match self.size {
136 None => {
138 self.size = Some(0);
140 None
141 }
142 Some(remaining) => {
143 let read_size = self
145 .ctx
146 .options
147 .chunk()
148 .map_or(remaining, |chunk| remaining.min(chunk as u64));
149 self.offset += read_size;
151 self.size = Some(remaining - read_size);
152 Some(read_size)
153 }
154 };
155
156 Some(BytesRange::new(next_offset, next_size))
157 }
158
159 pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
161 let Some(range) = self.next_range() else {
162 return Ok(None);
163 };
164
165 let args = self.ctx.args.clone().with_range(range);
166 let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
167 Ok(Some(r))
168 }
169
170 pub fn next_blocking_reader(&mut self) -> Result<Option<oio::BlockingReader>> {
172 let Some(range) = self.next_range() else {
173 return Ok(None);
174 };
175
176 let args = self.ctx.args.clone().with_range(range);
177 let (_, r) = self.ctx.acc.blocking_read(&self.ctx.path, args)?;
178 Ok(Some(r))
179 }
180}
181
182#[cfg(test)]
183mod tests {
184
185 use bytes::Bytes;
186
187 use super::*;
188
189 #[tokio::test]
190 async fn test_next_reader() -> Result<()> {
191 let op = Operator::via_iter(Scheme::Memory, [])?;
192 op.write(
193 "test",
194 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
195 )
196 .await?;
197
198 let acc = op.into_inner();
199 let ctx = Arc::new(ReadContext::new(
200 acc,
201 "test".to_string(),
202 OpRead::new(),
203 OpReader::new().with_chunk(3),
204 ));
205 let mut generator = ReadGenerator::new(ctx, 0, Some(10));
206 let mut readers = vec![];
207 while let Some(r) = generator.next_reader().await? {
208 readers.push(r);
209 }
210
211 pretty_assertions::assert_eq!(readers.len(), 4);
212 Ok(())
213 }
214
215 #[tokio::test]
216 async fn test_next_reader_without_size() -> Result<()> {
217 let op = Operator::via_iter(Scheme::Memory, [])?;
218 op.write(
219 "test",
220 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
221 )
222 .await?;
223
224 let acc = op.into_inner();
225 let ctx = Arc::new(ReadContext::new(
226 acc,
227 "test".to_string(),
228 OpRead::new(),
229 OpReader::new().with_chunk(3),
230 ));
231 let mut generator = ReadGenerator::new(ctx, 0, None);
232 let mut readers = vec![];
233 while let Some(r) = generator.next_reader().await? {
234 readers.push(r);
235 }
236
237 pretty_assertions::assert_eq!(readers.len(), 1);
238 Ok(())
239 }
240
241 #[test]
242 fn test_next_blocking_reader() -> Result<()> {
243 let op = Operator::via_iter(Scheme::Memory, [])?;
244 op.blocking().write(
245 "test",
246 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
247 )?;
248
249 let acc = op.into_inner();
250 let ctx = Arc::new(ReadContext::new(
251 acc,
252 "test".to_string(),
253 OpRead::new(),
254 OpReader::new().with_chunk(3),
255 ));
256 let mut generator = ReadGenerator::new(ctx, 0, Some(10));
257 let mut readers = vec![];
258 while let Some(r) = generator.next_blocking_reader()? {
259 readers.push(r);
260 }
261
262 pretty_assertions::assert_eq!(readers.len(), 4);
263 Ok(())
264 }
265}