opendal/types/read/
buffer_stream.rs1use std::ops::RangeBounds;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::Context;
22use std::task::Poll;
23
24use futures::ready;
25use futures::Stream;
26
27use crate::raw::oio::Read as _;
28use crate::raw::*;
29use crate::*;
30
31pub struct StreamingReader {
36 generator: ReadGenerator,
37 reader: Option<oio::Reader>,
38}
39
40impl StreamingReader {
41 #[inline]
43 fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
44 let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
45 Self {
46 generator,
47 reader: None,
48 }
49 }
50}
51
52impl oio::Read for StreamingReader {
53 async fn read(&mut self) -> Result<Buffer> {
54 loop {
55 if self.reader.is_none() {
56 self.reader = self.generator.next_reader().await?;
57 }
58 let Some(r) = self.reader.as_mut() else {
59 return Ok(Buffer::new());
60 };
61
62 let buf = r.read().await?;
63 if buf.is_empty() {
65 self.reader = None;
66 continue;
67 } else {
68 return Ok(buf);
69 }
70 }
71 }
72}
73
74pub struct ChunkedReader {
78 generator: ReadGenerator,
79 tasks: ConcurrentTasks<oio::Reader, Buffer>,
80 done: bool,
81}
82
83impl ChunkedReader {
84 fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
90 let tasks = ConcurrentTasks::new(
91 ctx.accessor().info().executor(),
92 ctx.options().concurrent(),
93 |mut r: oio::Reader| {
94 Box::pin(async {
95 match r.read_all().await {
96 Ok(buf) => (r, Ok(buf)),
97 Err(err) => (r, Err(err)),
98 }
99 })
100 },
101 );
102 let generator = ReadGenerator::new(ctx, range.offset(), range.size());
103 Self {
104 generator,
105 tasks,
106 done: false,
107 }
108 }
109}
110
111impl oio::Read for ChunkedReader {
112 async fn read(&mut self) -> Result<Buffer> {
113 while self.tasks.has_remaining() && !self.done {
114 if let Some(r) = self.generator.next_reader().await? {
115 self.tasks.execute(r).await?;
116 } else {
117 self.done = true;
118 break;
119 }
120 if self.tasks.has_result() {
121 break;
122 }
123 }
124 Ok(self.tasks.next().await.transpose()?.unwrap_or_default())
125 }
126}
127
128pub struct BufferStream {
132 state: State,
140}
141
142enum State {
143 Idle(Option<TwoWays<StreamingReader, ChunkedReader>>),
144 Reading(BoxedStaticFuture<(TwoWays<StreamingReader, ChunkedReader>, Result<Buffer>)>),
145}
146
147impl BufferStream {
148 pub(crate) fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
150 debug_assert!(
151 size.is_some() || ctx.options().chunk().is_none(),
152 "size must be known if chunk is set"
153 );
154
155 let reader = if ctx.options().chunk().is_some() {
156 TwoWays::Two(ChunkedReader::new(ctx, BytesRange::new(offset, size)))
157 } else {
158 TwoWays::One(StreamingReader::new(ctx, BytesRange::new(offset, size)))
159 };
160
161 Self {
162 state: State::Idle(Some(reader)),
163 }
164 }
165
166 pub(crate) async fn create(
171 ctx: Arc<ReadContext>,
172 range: impl RangeBounds<u64>,
173 ) -> Result<Self> {
174 let reader = if ctx.options().chunk().is_some() {
175 let range = ctx.parse_into_range(range).await?;
176 TwoWays::Two(ChunkedReader::new(ctx, range.into()))
177 } else {
178 TwoWays::One(StreamingReader::new(ctx, range.into()))
179 };
180
181 Ok(Self {
182 state: State::Idle(Some(reader)),
183 })
184 }
185}
186
187impl Stream for BufferStream {
188 type Item = Result<Buffer>;
189
190 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191 let this = self.get_mut();
192 loop {
193 match &mut this.state {
194 State::Idle(reader) => {
195 let mut reader = reader.take().unwrap();
196 let fut = async move {
197 let ret = reader.read().await;
198 (reader, ret)
199 };
200 this.state = State::Reading(Box::pin(fut));
201 }
202 State::Reading(fut) => {
203 let fut = fut.as_mut();
204 let (reader, buf) = ready!(fut.poll(cx));
205 this.state = State::Idle(Some(reader));
206 return match buf {
207 Ok(buf) if buf.is_empty() => Poll::Ready(None),
208 Ok(buf) => Poll::Ready(Some(Ok(buf))),
209 Err(err) => Poll::Ready(Some(Err(err))),
210 };
211 }
212 }
213 }
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::sync::Arc;
220
221 use bytes::Buf;
222 use bytes::Bytes;
223 use futures::TryStreamExt;
224 use pretty_assertions::assert_eq;
225
226 use super::*;
227
228 #[tokio::test]
229 async fn test_trait() -> Result<()> {
230 let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
231 let ctx = Arc::new(ReadContext::new(
232 acc,
233 "test".to_string(),
234 OpRead::new(),
235 OpReader::new(),
236 ));
237 let v = BufferStream::create(ctx, 4..8).await?;
238
239 let _: Box<dyn Unpin + MaybeSend + 'static> = Box::new(v);
240
241 Ok(())
242 }
243
244 #[tokio::test]
245 async fn test_buffer_stream() -> Result<()> {
246 let op = Operator::via_iter(Scheme::Memory, [])?;
247 op.write(
248 "test",
249 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
250 )
251 .await?;
252
253 let acc = op.into_inner();
254 let ctx = Arc::new(ReadContext::new(
255 acc,
256 "test".to_string(),
257 OpRead::new(),
258 OpReader::new(),
259 ));
260
261 let s = BufferStream::create(ctx, 4..8).await?;
262 let bufs: Vec<_> = s.try_collect().await.unwrap();
263 assert_eq!(bufs.len(), 1);
264 assert_eq!(bufs[0].chunk(), "o".as_bytes());
265
266 let buf: Buffer = bufs.into_iter().flatten().collect();
267 assert_eq!(buf.len(), 4);
268 assert_eq!(&buf.to_vec(), "oWor".as_bytes());
269
270 Ok(())
271 }
272}