opendal/types/read/
buffer_stream.rs1use std::ops::RangeBounds;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::Context;
22use std::task::Poll;
23
24use futures::ready;
25use futures::Stream;
26
27use crate::raw::oio::Read as _;
28use crate::raw::*;
29use crate::*;
30
31pub struct StreamingReader {
36 generator: ReadGenerator,
37 reader: Option<oio::Reader>,
38}
39
40impl StreamingReader {
41 #[inline]
43 fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
44 let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
45 Self {
46 generator,
47 reader: None,
48 }
49 }
50}
51
52impl oio::Read for StreamingReader {
53 async fn read(&mut self) -> Result<Buffer> {
54 loop {
55 if self.reader.is_none() {
56 self.reader = self.generator.next_reader().await?;
57 }
58 let Some(r) = self.reader.as_mut() else {
59 return Ok(Buffer::new());
60 };
61
62 let buf = r.read().await?;
63 if buf.is_empty() {
65 self.reader = None;
66 continue;
67 } else {
68 return Ok(buf);
69 }
70 }
71 }
72}
73
74pub struct ChunkedReader {
78 generator: ReadGenerator,
79 tasks: ConcurrentTasks<oio::Reader, Buffer>,
80 done: bool,
81}
82
83impl ChunkedReader {
84 fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
90 let tasks = ConcurrentTasks::new(
91 ctx.accessor().info().executor(),
92 ctx.options().concurrent(),
93 ctx.options().prefetch(),
94 |mut r: oio::Reader| {
95 Box::pin(async {
96 match r.read_all().await {
97 Ok(buf) => (r, Ok(buf)),
98 Err(err) => (r, Err(err)),
99 }
100 })
101 },
102 );
103 let generator = ReadGenerator::new(ctx, range.offset(), range.size());
104 Self {
105 generator,
106 tasks,
107 done: false,
108 }
109 }
110}
111
112impl oio::Read for ChunkedReader {
113 async fn read(&mut self) -> Result<Buffer> {
114 while self.tasks.has_remaining() && !self.done {
115 if let Some(r) = self.generator.next_reader().await? {
116 self.tasks.execute(r).await?;
117 } else {
118 self.done = true;
119 break;
120 }
121 if self.tasks.has_result() {
122 break;
123 }
124 }
125 Ok(self.tasks.next().await.transpose()?.unwrap_or_default())
126 }
127}
128
129pub struct BufferStream {
133 state: State,
141}
142
143enum State {
144 Idle(Option<TwoWays<StreamingReader, ChunkedReader>>),
145 Reading(BoxedStaticFuture<(TwoWays<StreamingReader, ChunkedReader>, Result<Buffer>)>),
146}
147
148impl BufferStream {
149 pub(crate) fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
151 debug_assert!(
152 size.is_some() || ctx.options().chunk().is_none(),
153 "size must be known if chunk is set"
154 );
155
156 let reader = if ctx.options().chunk().is_some() {
157 TwoWays::Two(ChunkedReader::new(ctx, BytesRange::new(offset, size)))
158 } else {
159 TwoWays::One(StreamingReader::new(ctx, BytesRange::new(offset, size)))
160 };
161
162 Self {
163 state: State::Idle(Some(reader)),
164 }
165 }
166
167 pub(crate) async fn create(
172 ctx: Arc<ReadContext>,
173 range: impl RangeBounds<u64>,
174 ) -> Result<Self> {
175 let reader = if ctx.options().chunk().is_some() {
176 let range = ctx.parse_into_range(range).await?;
177 TwoWays::Two(ChunkedReader::new(ctx, range.into()))
178 } else {
179 TwoWays::One(StreamingReader::new(ctx, range.into()))
180 };
181
182 Ok(Self {
183 state: State::Idle(Some(reader)),
184 })
185 }
186}
187
188impl Stream for BufferStream {
189 type Item = Result<Buffer>;
190
191 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192 let this = self.get_mut();
193 loop {
194 match &mut this.state {
195 State::Idle(reader) => {
196 let mut reader = reader.take().unwrap();
197 let fut = async move {
198 let ret = reader.read().await;
199 (reader, ret)
200 };
201 this.state = State::Reading(Box::pin(fut));
202 }
203 State::Reading(fut) => {
204 let fut = fut.as_mut();
205 let (reader, buf) = ready!(fut.poll(cx));
206 this.state = State::Idle(Some(reader));
207 return match buf {
208 Ok(buf) if buf.is_empty() => Poll::Ready(None),
209 Ok(buf) => Poll::Ready(Some(Ok(buf))),
210 Err(err) => Poll::Ready(Some(Err(err))),
211 };
212 }
213 }
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::Arc;
221
222 use bytes::Buf;
223 use bytes::Bytes;
224 use futures::TryStreamExt;
225 use pretty_assertions::assert_eq;
226
227 use super::*;
228
229 #[tokio::test]
230 async fn test_trait() -> Result<()> {
231 let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
232 let ctx = Arc::new(ReadContext::new(
233 acc,
234 "test".to_string(),
235 OpRead::new(),
236 OpReader::new(),
237 ));
238 let v = BufferStream::create(ctx, 4..8).await?;
239
240 let _: Box<dyn Unpin + MaybeSend + 'static> = Box::new(v);
241
242 Ok(())
243 }
244
245 #[tokio::test]
246 async fn test_buffer_stream() -> Result<()> {
247 let op = Operator::via_iter(Scheme::Memory, [])?;
248 op.write(
249 "test",
250 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
251 )
252 .await?;
253
254 let acc = op.into_inner();
255 let ctx = Arc::new(ReadContext::new(
256 acc,
257 "test".to_string(),
258 OpRead::new(),
259 OpReader::new(),
260 ));
261
262 let s = BufferStream::create(ctx, 4..8).await?;
263 let bufs: Vec<_> = s.try_collect().await.unwrap();
264 assert_eq!(bufs.len(), 1);
265 assert_eq!(bufs[0].chunk(), "o".as_bytes());
266
267 let buf: Buffer = bufs.into_iter().flatten().collect();
268 assert_eq!(buf.len(), 4);
269 assert_eq!(&buf.to_vec(), "oWor".as_bytes());
270
271 Ok(())
272 }
273}