opendal/types/read/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::stream;
24use futures::StreamExt;
25use futures::TryStreamExt;
26
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47///     let r = op.reader("path/to/file").await?;
48///     let bs = r.read(0..1024).await?;
49///     Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let s = op
63///         .reader("path/to/file")
64///         .await?
65///         .into_bytes_stream(1024..2048)
66///         .await?;
67///     let bs: Vec<Bytes> = s.try_collect().await?;
68///     Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81///     let mut r = op
82///         .reader("path/to/file")
83///         .await?
84///         .into_futures_async_read(1024..2048)
85///         .await?;
86///     let mut bs = vec![];
87///     let n = r.read_to_end(&mut bs).await?;
88///     Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93    ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97    /// Create a new reader.
98    ///
99    /// Create will use internal information to decide the most suitable
100    /// implementation for users.
101    ///
102    /// We don't want to expose those details to users so keep this function
103    /// in crate only.
104    pub(crate) fn new(ctx: ReadContext) -> Self {
105        Reader { ctx: Arc::new(ctx) }
106    }
107
108    /// Read give range from reader into [`Buffer`].
109    ///
110    /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111    /// storage services without any extra copy or intensive memory allocations.
112    pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113        let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114        Ok(bufs.into_iter().flatten().collect())
115    }
116
117    /// Read all data from reader into given [`BufMut`].
118    ///
119    /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120    /// [`BufMut`] doesn't have enough space.
121    pub async fn read_into(
122        &self,
123        buf: &mut impl BufMut,
124        range: impl RangeBounds<u64>,
125    ) -> Result<usize> {
126        let mut stream = self.clone().into_stream(range).await?;
127
128        let mut read = 0;
129        loop {
130            let Some(bs) = stream.try_next().await? else {
131                return Ok(read);
132            };
133            read += bs.len();
134            buf.put(bs);
135        }
136    }
137
138    /// Fetch specific ranges from reader.
139    ///
140    /// This operation try to merge given ranges into a list of
141    /// non-overlapping ranges. Users may also specify a `gap` to merge
142    /// close ranges.
143    ///
144    /// The returning `Buffer` may share the same underlying memory without
145    /// any extra copy.
146    pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147        let merged_ranges = self.merge_ranges(ranges.clone());
148
149        let merged_bufs: Vec<_> =
150            stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v)))
151                .buffered(self.ctx.options().concurrent())
152                .try_collect()
153                .await?;
154
155        let mut bufs = Vec::with_capacity(ranges.len());
156        for range in ranges {
157            let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
158            let start = range.start - merged_ranges[idx].start;
159            let end = range.end - merged_ranges[idx].start;
160            bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
161        }
162
163        Ok(bufs)
164    }
165
166    /// Merge given ranges into a list of non-overlapping ranges.
167    fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
168        let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
169        // We don't care about the order of range with same start, they
170        // will be merged in the next step.
171        ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
172
173        // We know that this vector will have at most element
174        let mut merged = Vec::with_capacity(ranges.len());
175        let mut cur = ranges[0].clone();
176
177        for range in ranges.into_iter().skip(1) {
178            if range.start <= cur.end + gap {
179                // There is an overlap or the gap is small enough to merge
180                cur.end = cur.end.max(range.end);
181            } else {
182                // No overlap and the gap is too large, push the current range to the list and start a new one
183                merged.push(cur);
184                cur = range;
185            }
186        }
187
188        // Push the last range
189        merged.push(cur);
190
191        merged
192    }
193
194    /// Create a buffer stream to read specific range from given reader.
195    ///
196    /// # Notes
197    ///
198    /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
199    /// It will return underlying [`Buffer`] directly.
200    ///
201    /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
202    ///
203    /// # Inputs
204    ///
205    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
206    ///
207    /// # Examples
208    ///
209    /// ## Basic Usage
210    ///
211    /// ```
212    /// use std::io;
213    ///
214    /// use futures::TryStreamExt;
215    /// use opendal::{Buffer, Operator};
216    /// use opendal::Result;
217    /// use bytes::Bytes;
218    ///
219    /// async fn test(op: Operator) -> io::Result<()> {
220    ///     let mut s = op
221    ///         .reader("hello.txt")
222    ///         .await?
223    ///         .into_stream(1024..2048)
224    ///         .await?;
225    ///
226    ///     let bs: Vec<Buffer> = s.try_collect().await?;
227    ///     // We can use those buffer as bytes if we want.
228    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
229    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
230    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
231    ///
232    ///     Ok(())
233    /// }
234    /// ```
235    ///
236    /// ## Concurrent Read
237    ///
238    /// The following example reads data in 256B chunks with 8 concurrent.
239    ///
240    /// ```
241    /// use std::io;
242    /// use bytes::Bytes;
243    ///
244    /// use futures::TryStreamExt;
245    /// use opendal::{Buffer, Operator};
246    /// use opendal::Result;
247    ///
248    /// async fn test(op: Operator) -> io::Result<()> {
249    ///     let s = op
250    ///         .reader_with("hello.txt")
251    ///         .concurrent(8)
252    ///         .chunk(256)
253    ///         .await?
254    ///         .into_stream(1024..2048)
255    ///         .await?;
256    ///
257    ///     // Every buffer except the last one in the stream will be 256B.
258    ///     let bs: Vec<Buffer> = s.try_collect().await?;
259    ///     // We can use those buffer as bytes if we want.
260    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
261    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
262    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
263    ///
264    ///     Ok(())
265    /// }
266    /// ```
267    pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
268        BufferStream::create(self.ctx, range).await
269    }
270
271    /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
272    /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
273    ///
274    /// # Notes
275    ///
276    /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
277    /// returns an owned [`Buffer`], which involves an extra copy operation.
278    ///
279    /// # Inputs
280    ///
281    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
282    ///
283    /// # Examples
284    ///
285    /// ## Basic Usage
286    ///
287    /// ```
288    /// use std::io;
289    ///
290    /// use futures::io::AsyncReadExt;
291    /// use opendal::Operator;
292    /// use opendal::Result;
293    ///
294    /// async fn test(op: Operator) -> io::Result<()> {
295    ///     let mut r = op
296    ///         .reader("hello.txt")
297    ///         .await?
298    ///         .into_futures_async_read(1024..2048)
299    ///         .await?;
300    ///     let mut bs = Vec::new();
301    ///     r.read_to_end(&mut bs).await?;
302    ///
303    ///     Ok(())
304    /// }
305    /// ```
306    ///
307    /// ## Concurrent Read
308    ///
309    /// The following example reads data in 256B chunks with 8 concurrent.
310    ///
311    /// ```
312    /// use std::io;
313    ///
314    /// use futures::io::AsyncReadExt;
315    /// use opendal::Operator;
316    /// use opendal::Result;
317    ///
318    /// async fn test(op: Operator) -> io::Result<()> {
319    ///     let mut r = op
320    ///         .reader_with("hello.txt")
321    ///         .concurrent(8)
322    ///         .chunk(256)
323    ///         .await?
324    ///         .into_futures_async_read(1024..2048)
325    ///         .await?;
326    ///     let mut bs = Vec::new();
327    ///     r.read_to_end(&mut bs).await?;
328    ///
329    ///     Ok(())
330    /// }
331    /// ```
332    #[inline]
333    pub async fn into_futures_async_read(
334        self,
335        range: impl RangeBounds<u64>,
336    ) -> Result<FuturesAsyncReader> {
337        let range = self.ctx.parse_into_range(range).await?;
338        Ok(FuturesAsyncReader::new(self.ctx, range))
339    }
340
341    /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
342    ///
343    /// # Inputs
344    ///
345    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
346    ///
347    /// # Examples
348    ///
349    /// ## Basic Usage
350    ///
351    /// ```
352    /// use std::io;
353    ///
354    /// use bytes::Bytes;
355    /// use futures::TryStreamExt;
356    /// use opendal::Operator;
357    /// use opendal::Result;
358    ///
359    /// async fn test(op: Operator) -> io::Result<()> {
360    ///     let mut s = op
361    ///         .reader("hello.txt")
362    ///         .await?
363    ///         .into_bytes_stream(1024..2048)
364    ///         .await?;
365    ///     let bs: Vec<Bytes> = s.try_collect().await?;
366    ///
367    ///     Ok(())
368    /// }
369    /// ```
370    ///
371    /// ## Concurrent Read
372    ///
373    /// The following example reads data in 256B chunks with 8 concurrent.
374    ///
375    /// ```
376    /// use std::io;
377    ///
378    /// use bytes::Bytes;
379    /// use futures::TryStreamExt;
380    /// use opendal::Operator;
381    /// use opendal::Result;
382    ///
383    /// async fn test(op: Operator) -> io::Result<()> {
384    ///     let mut s = op
385    ///         .reader_with("hello.txt")
386    ///         .concurrent(8)
387    ///         .chunk(256)
388    ///         .await?
389    ///         .into_bytes_stream(1024..2048)
390    ///         .await?;
391    ///     let bs: Vec<Bytes> = s.try_collect().await?;
392    ///
393    ///     Ok(())
394    /// }
395    /// ```
396    #[inline]
397    pub async fn into_bytes_stream(
398        self,
399        range: impl RangeBounds<u64>,
400    ) -> Result<FuturesBytesStream> {
401        FuturesBytesStream::new(self.ctx, range).await
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use bytes::Bytes;
408    use rand::rngs::ThreadRng;
409    use rand::Rng;
410    use rand::RngCore;
411
412    use super::*;
413    use crate::raw::*;
414    use crate::services;
415    use crate::Operator;
416
417    #[tokio::test]
418    async fn test_trait() -> Result<()> {
419        let op = Operator::via_iter(Scheme::Memory, [])?;
420        op.write(
421            "test",
422            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
423        )
424        .await?;
425
426        let acc = op.into_inner();
427        let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
428
429        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
430
431        Ok(())
432    }
433
434    fn gen_random_bytes() -> Vec<u8> {
435        let mut rng = ThreadRng::default();
436        // Generate size between 1B..16MB.
437        let size = rng.gen_range(1..16 * 1024 * 1024);
438        let mut content = vec![0; size];
439        rng.fill_bytes(&mut content);
440        content
441    }
442
443    fn gen_fixed_bytes(size: usize) -> Vec<u8> {
444        let mut rng = ThreadRng::default();
445        let mut content = vec![0; size];
446        rng.fill_bytes(&mut content);
447        content
448    }
449
450    #[tokio::test]
451    async fn test_reader_read() -> Result<()> {
452        let op = Operator::via_iter(Scheme::Memory, [])?;
453        let path = "test_file";
454
455        let content = gen_random_bytes();
456        op.write(path, content.clone())
457            .await
458            .expect("write must succeed");
459
460        let reader = op.reader(path).await.unwrap();
461        let buf = reader.read(..).await.expect("read to end must succeed");
462
463        assert_eq!(buf.to_bytes(), content);
464        Ok(())
465    }
466
467    #[tokio::test]
468    async fn test_reader_read_with_chunk() -> Result<()> {
469        let op = Operator::via_iter(Scheme::Memory, [])?;
470        let path = "test_file";
471
472        let content = gen_random_bytes();
473        op.write(path, content.clone())
474            .await
475            .expect("write must succeed");
476
477        let reader = op.reader_with(path).chunk(16).await.unwrap();
478        let buf = reader.read(..).await.expect("read to end must succeed");
479
480        assert_eq!(buf.to_bytes(), content);
481        Ok(())
482    }
483
484    #[tokio::test]
485    async fn test_reader_read_with_concurrent() -> Result<()> {
486        let op = Operator::via_iter(Scheme::Memory, [])?;
487        let path = "test_file";
488
489        let content = gen_random_bytes();
490        op.write(path, content.clone())
491            .await
492            .expect("write must succeed");
493
494        let reader = op
495            .reader_with(path)
496            .chunk(128)
497            .concurrent(16)
498            .await
499            .unwrap();
500        let buf = reader.read(..).await.expect("read to end must succeed");
501
502        assert_eq!(buf.to_bytes(), content);
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_reader_read_into() -> Result<()> {
508        let op = Operator::via_iter(Scheme::Memory, [])?;
509        let path = "test_file";
510
511        let content = gen_random_bytes();
512        op.write(path, content.clone())
513            .await
514            .expect("write must succeed");
515
516        let reader = op.reader(path).await.unwrap();
517        let mut buf = Vec::new();
518        reader
519            .read_into(&mut buf, ..)
520            .await
521            .expect("read to end must succeed");
522
523        assert_eq!(buf, content);
524        Ok(())
525    }
526
527    #[tokio::test]
528    async fn test_merge_ranges() -> Result<()> {
529        let op = Operator::new(services::Memory::default()).unwrap().finish();
530        let path = "test_file";
531
532        let content = gen_random_bytes();
533        op.write(path, content.clone())
534            .await
535            .expect("write must succeed");
536
537        let reader = op.reader_with(path).gap(1).await.unwrap();
538
539        let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
540        let merged = reader.merge_ranges(ranges.clone());
541        assert_eq!(merged, vec![0..30, 40..60]);
542        Ok(())
543    }
544
545    #[tokio::test]
546    async fn test_fetch() -> Result<()> {
547        let op = Operator::new(services::Memory::default()).unwrap().finish();
548        let path = "test_file";
549
550        let content = gen_fixed_bytes(1024);
551        op.write(path, content.clone())
552            .await
553            .expect("write must succeed");
554
555        let reader = op.reader_with(path).gap(1).await.unwrap();
556
557        let ranges = vec![
558            0..10,
559            40..50,
560            45..59,
561            10..20,
562            21..30,
563            40..50,
564            40..60,
565            45..59,
566        ];
567        let merged = reader
568            .fetch(ranges.clone())
569            .await
570            .expect("fetch must succeed");
571
572        for (i, range) in ranges.iter().enumerate() {
573            assert_eq!(
574                merged[i].to_bytes(),
575                content[range.start as usize..range.end as usize]
576            );
577        }
578        Ok(())
579    }
580}