opendal/types/read/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::stream;
24use futures::StreamExt;
25use futures::TryStreamExt;
26
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47///     let r = op.reader("path/to/file").await?;
48///     let bs = r.read(0..1024).await?;
49///     Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let s = op
63///         .reader("path/to/file")
64///         .await?
65///         .into_bytes_stream(1024..2048)
66///         .await?;
67///     let bs: Vec<Bytes> = s.try_collect().await?;
68///     Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81///     let mut r = op
82///         .reader("path/to/file")
83///         .await?
84///         .into_futures_async_read(1024..2048)
85///         .await?;
86///     let mut bs = vec![];
87///     let n = r.read_to_end(&mut bs).await?;
88///     Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93    ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97    /// Create a new reader.
98    ///
99    /// Create will use internal information to decide the most suitable
100    /// implementation for users.
101    ///
102    /// We don't want to expose those details to users so keep this function
103    /// in crate only.
104    pub(crate) fn new(ctx: ReadContext) -> Self {
105        Reader { ctx: Arc::new(ctx) }
106    }
107
108    /// Read give range from reader into [`Buffer`].
109    ///
110    /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111    /// storage services without any extra copy or intensive memory allocations.
112    pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113        let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114        Ok(bufs.into_iter().flatten().collect())
115    }
116
117    /// Read all data from reader into given [`BufMut`].
118    ///
119    /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120    /// [`BufMut`] doesn't have enough space.
121    pub async fn read_into(
122        &self,
123        buf: &mut impl BufMut,
124        range: impl RangeBounds<u64>,
125    ) -> Result<usize> {
126        let mut stream = self.clone().into_stream(range).await?;
127
128        let mut read = 0;
129        loop {
130            let Some(bs) = stream.try_next().await? else {
131                return Ok(read);
132            };
133            read += bs.len();
134            buf.put(bs);
135        }
136    }
137
138    /// Fetch specific ranges from reader.
139    ///
140    /// This operation try to merge given ranges into a list of
141    /// non-overlapping ranges. Users may also specify a `gap` to merge
142    /// close ranges.
143    ///
144    /// The returning `Buffer` may share the same underlying memory without
145    /// any extra copy.
146    pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147        let merged_ranges = self.merge_ranges(ranges.clone());
148
149        let merged_bufs: Vec<_> =
150            stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v)))
151                .buffered(self.ctx.options().concurrent())
152                .try_collect()
153                .await?;
154
155        let mut bufs = Vec::with_capacity(ranges.len());
156        for range in ranges {
157            let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
158            let start = range.start - merged_ranges[idx].start;
159            let end = range.end - merged_ranges[idx].start;
160            bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
161        }
162
163        Ok(bufs)
164    }
165
166    /// Merge given ranges into a list of non-overlapping ranges.
167    fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
168        let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
169        // We don't care about the order of range with same start, they
170        // will be merged in the next step.
171        ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
172
173        // We know that this vector will have at most element
174        let mut merged = Vec::with_capacity(ranges.len());
175        let mut cur = ranges[0].clone();
176
177        for range in ranges.into_iter().skip(1) {
178            if range.start <= cur.end + gap {
179                // There is an overlap or the gap is small enough to merge
180                cur.end = cur.end.max(range.end);
181            } else {
182                // No overlap and the gap is too large, push the current range to the list and start a new one
183                merged.push(cur);
184                cur = range;
185            }
186        }
187
188        // Push the last range
189        merged.push(cur);
190
191        merged
192    }
193
194    /// Create a buffer stream to read specific range from given reader.
195    ///
196    /// # Notes
197    ///
198    /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
199    /// It will return underlying [`Buffer`] directly.
200    ///
201    /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
202    ///
203    /// # Inputs
204    ///
205    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
206    ///
207    /// # Examples
208    ///
209    /// ## Basic Usage
210    ///
211    /// ```
212    /// use std::io;
213    ///
214    /// use bytes::Bytes;
215    /// use futures::TryStreamExt;
216    /// use opendal::Buffer;
217    /// use opendal::Operator;
218    /// use opendal::Result;
219    ///
220    /// async fn test(op: Operator) -> io::Result<()> {
221    ///     let mut s = op
222    ///         .reader("hello.txt")
223    ///         .await?
224    ///         .into_stream(1024..2048)
225    ///         .await?;
226    ///
227    ///     let bs: Vec<Buffer> = s.try_collect().await?;
228    ///     // We can use those buffer as bytes if we want.
229    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
230    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
231    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
232    ///
233    ///     Ok(())
234    /// }
235    /// ```
236    ///
237    /// ## Concurrent Read
238    ///
239    /// The following example reads data in 256B chunks with 8 concurrent.
240    ///
241    /// ```
242    /// use std::io;
243    ///
244    /// use bytes::Bytes;
245    /// use futures::TryStreamExt;
246    /// use opendal::Buffer;
247    /// use opendal::Operator;
248    /// use opendal::Result;
249    ///
250    /// async fn test(op: Operator) -> io::Result<()> {
251    ///     let s = op
252    ///         .reader_with("hello.txt")
253    ///         .concurrent(8)
254    ///         .chunk(256)
255    ///         .await?
256    ///         .into_stream(1024..2048)
257    ///         .await?;
258    ///
259    ///     // Every buffer except the last one in the stream will be 256B.
260    ///     let bs: Vec<Buffer> = s.try_collect().await?;
261    ///     // We can use those buffer as bytes if we want.
262    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
263    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
264    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
265    ///
266    ///     Ok(())
267    /// }
268    /// ```
269    pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
270        BufferStream::create(self.ctx, range).await
271    }
272
273    /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
274    /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
275    ///
276    /// # Notes
277    ///
278    /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
279    /// returns an owned [`Buffer`], which involves an extra copy operation.
280    ///
281    /// # Inputs
282    ///
283    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
284    ///
285    /// # Examples
286    ///
287    /// ## Basic Usage
288    ///
289    /// ```
290    /// use std::io;
291    ///
292    /// use futures::io::AsyncReadExt;
293    /// use opendal::Operator;
294    /// use opendal::Result;
295    ///
296    /// async fn test(op: Operator) -> io::Result<()> {
297    ///     let mut r = op
298    ///         .reader("hello.txt")
299    ///         .await?
300    ///         .into_futures_async_read(1024..2048)
301    ///         .await?;
302    ///     let mut bs = Vec::new();
303    ///     r.read_to_end(&mut bs).await?;
304    ///
305    ///     Ok(())
306    /// }
307    /// ```
308    ///
309    /// ## Concurrent Read
310    ///
311    /// The following example reads data in 256B chunks with 8 concurrent.
312    ///
313    /// ```
314    /// use std::io;
315    ///
316    /// use futures::io::AsyncReadExt;
317    /// use opendal::Operator;
318    /// use opendal::Result;
319    ///
320    /// async fn test(op: Operator) -> io::Result<()> {
321    ///     let mut r = op
322    ///         .reader_with("hello.txt")
323    ///         .concurrent(8)
324    ///         .chunk(256)
325    ///         .await?
326    ///         .into_futures_async_read(1024..2048)
327    ///         .await?;
328    ///     let mut bs = Vec::new();
329    ///     r.read_to_end(&mut bs).await?;
330    ///
331    ///     Ok(())
332    /// }
333    /// ```
334    #[inline]
335    pub async fn into_futures_async_read(
336        self,
337        range: impl RangeBounds<u64>,
338    ) -> Result<FuturesAsyncReader> {
339        let range = self.ctx.parse_into_range(range).await?;
340        Ok(FuturesAsyncReader::new(self.ctx, range))
341    }
342
343    /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
344    ///
345    /// # Inputs
346    ///
347    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
348    ///
349    /// # Examples
350    ///
351    /// ## Basic Usage
352    ///
353    /// ```
354    /// use std::io;
355    ///
356    /// use bytes::Bytes;
357    /// use futures::TryStreamExt;
358    /// use opendal::Operator;
359    /// use opendal::Result;
360    ///
361    /// async fn test(op: Operator) -> io::Result<()> {
362    ///     let mut s = op
363    ///         .reader("hello.txt")
364    ///         .await?
365    ///         .into_bytes_stream(1024..2048)
366    ///         .await?;
367    ///     let bs: Vec<Bytes> = s.try_collect().await?;
368    ///
369    ///     Ok(())
370    /// }
371    /// ```
372    ///
373    /// ## Concurrent Read
374    ///
375    /// The following example reads data in 256B chunks with 8 concurrent.
376    ///
377    /// ```
378    /// use std::io;
379    ///
380    /// use bytes::Bytes;
381    /// use futures::TryStreamExt;
382    /// use opendal::Operator;
383    /// use opendal::Result;
384    ///
385    /// async fn test(op: Operator) -> io::Result<()> {
386    ///     let mut s = op
387    ///         .reader_with("hello.txt")
388    ///         .concurrent(8)
389    ///         .chunk(256)
390    ///         .await?
391    ///         .into_bytes_stream(1024..2048)
392    ///         .await?;
393    ///     let bs: Vec<Bytes> = s.try_collect().await?;
394    ///
395    ///     Ok(())
396    /// }
397    /// ```
398    #[inline]
399    pub async fn into_bytes_stream(
400        self,
401        range: impl RangeBounds<u64>,
402    ) -> Result<FuturesBytesStream> {
403        FuturesBytesStream::new(self.ctx, range).await
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use bytes::Bytes;
410    use rand::rngs::ThreadRng;
411    use rand::Rng;
412    use rand::RngCore;
413
414    use super::*;
415    use crate::raw::*;
416    use crate::services;
417    use crate::Operator;
418
419    #[tokio::test]
420    async fn test_trait() -> Result<()> {
421        let op = Operator::via_iter(Scheme::Memory, [])?;
422        op.write(
423            "test",
424            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
425        )
426        .await?;
427
428        let acc = op.into_inner();
429        let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
430
431        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
432
433        Ok(())
434    }
435
436    fn gen_random_bytes() -> Vec<u8> {
437        let mut rng = ThreadRng::default();
438        // Generate size between 1B..16MB.
439        let size = rng.gen_range(1..16 * 1024 * 1024);
440        let mut content = vec![0; size];
441        rng.fill_bytes(&mut content);
442        content
443    }
444
445    fn gen_fixed_bytes(size: usize) -> Vec<u8> {
446        let mut rng = ThreadRng::default();
447        let mut content = vec![0; size];
448        rng.fill_bytes(&mut content);
449        content
450    }
451
452    #[tokio::test]
453    async fn test_reader_read() -> Result<()> {
454        let op = Operator::via_iter(Scheme::Memory, [])?;
455        let path = "test_file";
456
457        let content = gen_random_bytes();
458        op.write(path, content.clone())
459            .await
460            .expect("write must succeed");
461
462        let reader = op.reader(path).await.unwrap();
463        let buf = reader.read(..).await.expect("read to end must succeed");
464
465        assert_eq!(buf.to_bytes(), content);
466        Ok(())
467    }
468
469    #[tokio::test]
470    async fn test_reader_read_with_chunk() -> Result<()> {
471        let op = Operator::via_iter(Scheme::Memory, [])?;
472        let path = "test_file";
473
474        let content = gen_random_bytes();
475        op.write(path, content.clone())
476            .await
477            .expect("write must succeed");
478
479        let reader = op.reader_with(path).chunk(16).await.unwrap();
480        let buf = reader.read(..).await.expect("read to end must succeed");
481
482        assert_eq!(buf.to_bytes(), content);
483        Ok(())
484    }
485
486    #[tokio::test]
487    async fn test_reader_read_with_concurrent() -> Result<()> {
488        let op = Operator::via_iter(Scheme::Memory, [])?;
489        let path = "test_file";
490
491        let content = gen_random_bytes();
492        op.write(path, content.clone())
493            .await
494            .expect("write must succeed");
495
496        let reader = op
497            .reader_with(path)
498            .chunk(128)
499            .concurrent(16)
500            .await
501            .unwrap();
502        let buf = reader.read(..).await.expect("read to end must succeed");
503
504        assert_eq!(buf.to_bytes(), content);
505        Ok(())
506    }
507
508    #[tokio::test]
509    async fn test_reader_read_into() -> Result<()> {
510        let op = Operator::via_iter(Scheme::Memory, [])?;
511        let path = "test_file";
512
513        let content = gen_random_bytes();
514        op.write(path, content.clone())
515            .await
516            .expect("write must succeed");
517
518        let reader = op.reader(path).await.unwrap();
519        let mut buf = Vec::new();
520        reader
521            .read_into(&mut buf, ..)
522            .await
523            .expect("read to end must succeed");
524
525        assert_eq!(buf, content);
526        Ok(())
527    }
528
529    #[tokio::test]
530    async fn test_merge_ranges() -> Result<()> {
531        let op = Operator::new(services::Memory::default()).unwrap().finish();
532        let path = "test_file";
533
534        let content = gen_random_bytes();
535        op.write(path, content.clone())
536            .await
537            .expect("write must succeed");
538
539        let reader = op.reader_with(path).gap(1).await.unwrap();
540
541        let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
542        let merged = reader.merge_ranges(ranges.clone());
543        assert_eq!(merged, vec![0..30, 40..60]);
544        Ok(())
545    }
546
547    #[tokio::test]
548    async fn test_fetch() -> Result<()> {
549        let op = Operator::new(services::Memory::default()).unwrap().finish();
550        let path = "test_file";
551
552        let content = gen_fixed_bytes(1024);
553        op.write(path, content.clone())
554            .await
555            .expect("write must succeed");
556
557        let reader = op.reader_with(path).gap(1).await.unwrap();
558
559        let ranges = vec![
560            0..10,
561            40..50,
562            45..59,
563            10..20,
564            21..30,
565            40..50,
566            40..60,
567            45..59,
568        ];
569        let merged = reader
570            .fetch(ranges.clone())
571            .await
572            .expect("fetch must succeed");
573
574        for (i, range) in ranges.iter().enumerate() {
575            assert_eq!(
576                merged[i].to_bytes(),
577                content[range.start as usize..range.end as usize]
578            );
579        }
580        Ok(())
581    }
582}