opendal/types/context/
read.rs1use std::ops::Bound;
19use std::ops::Range;
20use std::ops::RangeBounds;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26pub struct ReadContext {
28 acc: Accessor,
30 path: String,
32 args: OpRead,
34 options: OpReader,
36}
37
38impl ReadContext {
39 #[inline]
41 pub fn new(acc: Accessor, path: String, args: OpRead, options: OpReader) -> Self {
42 Self {
43 acc,
44 path,
45 args,
46 options,
47 }
48 }
49
50 #[inline]
52 pub fn accessor(&self) -> &Accessor {
53 &self.acc
54 }
55
56 #[inline]
58 pub fn path(&self) -> &str {
59 &self.path
60 }
61
62 #[inline]
64 pub fn args(&self) -> &OpRead {
65 &self.args
66 }
67
68 #[inline]
70 pub fn options(&self) -> &OpReader {
71 &self.options
72 }
73
74 pub(crate) async fn parse_into_range(
76 &self,
77 range: impl RangeBounds<u64>,
78 ) -> Result<Range<u64>> {
79 let start = match range.start_bound() {
80 Bound::Included(v) => *v,
81 Bound::Excluded(v) => v + 1,
82 Bound::Unbounded => 0,
83 };
84
85 let end = match range.end_bound() {
86 Bound::Included(v) => v + 1,
87 Bound::Excluded(v) => *v,
88 Bound::Unbounded => {
89 let mut op_stat = OpStat::new();
90
91 if let Some(v) = self.args().version() {
92 op_stat = op_stat.with_version(v);
93 }
94
95 self.accessor()
96 .stat(self.path(), op_stat)
97 .await?
98 .into_metadata()
99 .content_length()
100 }
101 };
102
103 Ok(start..end)
104 }
105}
106
107pub struct ReadGenerator {
117 ctx: Arc<ReadContext>,
118
119 offset: u64,
120 size: Option<u64>,
121}
122
123impl ReadGenerator {
124 #[inline]
126 pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
127 Self { ctx, offset, size }
128 }
129
130 fn next_range(&mut self) -> Option<BytesRange> {
132 if self.size == Some(0) {
133 return None;
134 }
135
136 let next_offset = self.offset;
137 let next_size = match self.size {
138 None => {
140 self.size = Some(0);
142 None
143 }
144 Some(remaining) => {
145 let read_size = self
147 .ctx
148 .options
149 .chunk()
150 .map_or(remaining, |chunk| remaining.min(chunk as u64));
151 self.offset += read_size;
153 self.size = Some(remaining - read_size);
154 Some(read_size)
155 }
156 };
157
158 Some(BytesRange::new(next_offset, next_size))
159 }
160
161 pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
163 let Some(range) = self.next_range() else {
164 return Ok(None);
165 };
166
167 let args = self.ctx.args.clone().with_range(range);
168 let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
169 Ok(Some(r))
170 }
171}
172
173#[cfg(test)]
174mod tests {
175
176 use bytes::Bytes;
177
178 use super::*;
179
180 #[tokio::test]
181 async fn test_next_reader() -> Result<()> {
182 let op = Operator::via_iter(Scheme::Memory, [])?;
183 op.write(
184 "test",
185 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
186 )
187 .await?;
188
189 let acc = op.into_inner();
190 let ctx = Arc::new(ReadContext::new(
191 acc,
192 "test".to_string(),
193 OpRead::new(),
194 OpReader::new().with_chunk(3),
195 ));
196 let mut generator = ReadGenerator::new(ctx, 0, Some(10));
197 let mut readers = vec![];
198 while let Some(r) = generator.next_reader().await? {
199 readers.push(r);
200 }
201
202 pretty_assertions::assert_eq!(readers.len(), 4);
203 Ok(())
204 }
205
206 #[tokio::test]
207 async fn test_next_reader_without_size() -> Result<()> {
208 let op = Operator::via_iter(Scheme::Memory, [])?;
209 op.write(
210 "test",
211 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
212 )
213 .await?;
214
215 let acc = op.into_inner();
216 let ctx = Arc::new(ReadContext::new(
217 acc,
218 "test".to_string(),
219 OpRead::new(),
220 OpReader::new().with_chunk(3),
221 ));
222 let mut generator = ReadGenerator::new(ctx, 0, None);
223 let mut readers = vec![];
224 while let Some(r) = generator.next_reader().await? {
225 readers.push(r);
226 }
227
228 pretty_assertions::assert_eq!(readers.len(), 1);
229 Ok(())
230 }
231}