opendal/types/read/
buffer_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::RangeBounds;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::Context;
22use std::task::Poll;
23
24use futures::ready;
25use futures::Stream;
26
27use crate::raw::oio::Read as _;
28use crate::raw::*;
29use crate::*;
30
31/// StreamingReader will stream the content of the file without reading into
32/// memory first.
33///
34/// StreamingReader is good for small memory footprint and optimized for latency.
35pub struct StreamingReader {
36    generator: ReadGenerator,
37    reader: Option<oio::Reader>,
38}
39
40impl StreamingReader {
41    /// Create a new streaming reader.
42    #[inline]
43    fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
44        let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
45        Self {
46            generator,
47            reader: None,
48        }
49    }
50}
51
52impl oio::Read for StreamingReader {
53    async fn read(&mut self) -> Result<Buffer> {
54        loop {
55            if self.reader.is_none() {
56                self.reader = self.generator.next_reader().await?;
57            }
58            let Some(r) = self.reader.as_mut() else {
59                return Ok(Buffer::new());
60            };
61
62            let buf = r.read().await?;
63            // Reset reader to None if this reader returns empty buffer.
64            if buf.is_empty() {
65                self.reader = None;
66                continue;
67            } else {
68                return Ok(buf);
69            }
70        }
71    }
72}
73
74/// ChunkedReader will read the file in chunks.
75///
76/// ChunkedReader is good for concurrent read and optimized for throughput.
77pub struct ChunkedReader {
78    generator: ReadGenerator,
79    tasks: ConcurrentTasks<oio::Reader, Buffer>,
80    done: bool,
81}
82
83impl ChunkedReader {
84    /// Create a new chunked reader.
85    ///
86    /// # Notes
87    ///
88    /// We don't need to handle `Executor::timeout` since we are outside the layer.
89    fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
90        let tasks = ConcurrentTasks::new(
91            ctx.accessor().info().executor(),
92            ctx.options().concurrent(),
93            |mut r: oio::Reader| {
94                Box::pin(async {
95                    match r.read_all().await {
96                        Ok(buf) => (r, Ok(buf)),
97                        Err(err) => (r, Err(err)),
98                    }
99                })
100            },
101        );
102        let generator = ReadGenerator::new(ctx, range.offset(), range.size());
103        Self {
104            generator,
105            tasks,
106            done: false,
107        }
108    }
109}
110
111impl oio::Read for ChunkedReader {
112    async fn read(&mut self) -> Result<Buffer> {
113        while self.tasks.has_remaining() && !self.done {
114            if let Some(r) = self.generator.next_reader().await? {
115                self.tasks.execute(r).await?;
116            } else {
117                self.done = true;
118                break;
119            }
120            if self.tasks.has_result() {
121                break;
122            }
123        }
124        Ok(self.tasks.next().await.transpose()?.unwrap_or_default())
125    }
126}
127
128/// BufferStream is a stream of buffers, created by [`Reader::into_stream`]
129///
130/// `BufferStream` implements `Stream` trait.
131pub struct BufferStream {
132    /// # Notes to maintainers
133    ///
134    /// The underlying reader is either a StreamingReader or a ChunkedReader.
135    ///
136    /// - If chunk is None, BufferStream will use StreamingReader to iterate
137    ///   data in streaming way.
138    /// - Otherwise, BufferStream will use ChunkedReader to read data in chunks.
139    state: State,
140}
141
142enum State {
143    Idle(Option<TwoWays<StreamingReader, ChunkedReader>>),
144    Reading(BoxedStaticFuture<(TwoWays<StreamingReader, ChunkedReader>, Result<Buffer>)>),
145}
146
147impl BufferStream {
148    /// Create a new buffer stream with already calculated offset and size.
149    pub(crate) fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
150        debug_assert!(
151            size.is_some() || ctx.options().chunk().is_none(),
152            "size must be known if chunk is set"
153        );
154
155        let reader = if ctx.options().chunk().is_some() {
156            TwoWays::Two(ChunkedReader::new(ctx, BytesRange::new(offset, size)))
157        } else {
158            TwoWays::One(StreamingReader::new(ctx, BytesRange::new(offset, size)))
159        };
160
161        Self {
162            state: State::Idle(Some(reader)),
163        }
164    }
165
166    /// Create a new buffer stream with given range bound.
167    ///
168    /// If users is going to perform chunked read but the read size is unknown, we will parse
169    /// into range first.
170    pub(crate) async fn create(
171        ctx: Arc<ReadContext>,
172        range: impl RangeBounds<u64>,
173    ) -> Result<Self> {
174        let reader = if ctx.options().chunk().is_some() {
175            let range = ctx.parse_into_range(range).await?;
176            TwoWays::Two(ChunkedReader::new(ctx, range.into()))
177        } else {
178            TwoWays::One(StreamingReader::new(ctx, range.into()))
179        };
180
181        Ok(Self {
182            state: State::Idle(Some(reader)),
183        })
184    }
185}
186
187impl Stream for BufferStream {
188    type Item = Result<Buffer>;
189
190    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191        let this = self.get_mut();
192        loop {
193            match &mut this.state {
194                State::Idle(reader) => {
195                    let mut reader = reader.take().unwrap();
196                    let fut = async move {
197                        let ret = reader.read().await;
198                        (reader, ret)
199                    };
200                    this.state = State::Reading(Box::pin(fut));
201                }
202                State::Reading(fut) => {
203                    let fut = fut.as_mut();
204                    let (reader, buf) = ready!(fut.poll(cx));
205                    this.state = State::Idle(Some(reader));
206                    return match buf {
207                        Ok(buf) if buf.is_empty() => Poll::Ready(None),
208                        Ok(buf) => Poll::Ready(Some(Ok(buf))),
209                        Err(err) => Poll::Ready(Some(Err(err))),
210                    };
211                }
212            }
213        }
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::sync::Arc;
220
221    use bytes::Buf;
222    use bytes::Bytes;
223    use futures::TryStreamExt;
224    use pretty_assertions::assert_eq;
225
226    use super::*;
227
228    #[tokio::test]
229    async fn test_trait() -> Result<()> {
230        let acc = Operator::via_iter(Scheme::Memory, [])?.into_inner();
231        let ctx = Arc::new(ReadContext::new(
232            acc,
233            "test".to_string(),
234            OpRead::new(),
235            OpReader::new(),
236        ));
237        let v = BufferStream::create(ctx, 4..8).await?;
238
239        let _: Box<dyn Unpin + MaybeSend + 'static> = Box::new(v);
240
241        Ok(())
242    }
243
244    #[tokio::test]
245    async fn test_buffer_stream() -> Result<()> {
246        let op = Operator::via_iter(Scheme::Memory, [])?;
247        op.write(
248            "test",
249            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
250        )
251        .await?;
252
253        let acc = op.into_inner();
254        let ctx = Arc::new(ReadContext::new(
255            acc,
256            "test".to_string(),
257            OpRead::new(),
258            OpReader::new(),
259        ));
260
261        let s = BufferStream::create(ctx, 4..8).await?;
262        let bufs: Vec<_> = s.try_collect().await.unwrap();
263        assert_eq!(bufs.len(), 1);
264        assert_eq!(bufs[0].chunk(), "o".as_bytes());
265
266        let buf: Buffer = bufs.into_iter().flatten().collect();
267        assert_eq!(buf.len(), 4);
268        assert_eq!(&buf.to_vec(), "oWor".as_bytes());
269
270        Ok(())
271    }
272}