opendal_core/types/read/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::TryStreamExt;
24
25use crate::raw::Access;
26use crate::raw::ConcurrentTasks;
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal_core::Operator;
44/// use opendal_core::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47///     let r = op.reader("path/to/file").await?;
48///     let bs = r.read(0..1024).await?;
49///     Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal_core::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let s = op
63///         .reader("path/to/file")
64///         .await?
65///         .into_bytes_stream(1024..2048)
66///         .await?;
67///     let bs: Vec<Bytes> = s.try_collect().await?;
68///     Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal_core::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81///     let mut r = op
82///         .reader("path/to/file")
83///         .await?
84///         .into_futures_async_read(1024..2048)
85///         .await?;
86///     let mut bs = vec![];
87///     let n = r.read_to_end(&mut bs).await?;
88///     Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93    ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97    /// Create a new reader.
98    ///
99    /// Create will use internal information to decide the most suitable
100    /// implementation for users.
101    ///
102    /// We don't want to expose those details to users so keep this function
103    /// in crate only.
104    pub(crate) fn new(ctx: ReadContext) -> Self {
105        Reader { ctx: Arc::new(ctx) }
106    }
107
108    /// Read give range from reader into [`Buffer`].
109    ///
110    /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111    /// storage services without any extra copy or intensive memory allocations.
112    pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113        let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114        Ok(bufs.into_iter().flatten().collect())
115    }
116
117    /// Read all data from reader into given [`BufMut`].
118    ///
119    /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120    /// [`BufMut`] doesn't have enough space.
121    pub async fn read_into(
122        &self,
123        buf: &mut impl BufMut,
124        range: impl RangeBounds<u64>,
125    ) -> Result<usize> {
126        let mut stream = self.clone().into_stream(range).await?;
127
128        let mut read = 0;
129        loop {
130            let Some(bs) = stream.try_next().await? else {
131                return Ok(read);
132            };
133            read += bs.len();
134            buf.put(bs);
135        }
136    }
137
138    /// Fetch specific ranges from reader.
139    ///
140    /// This operation try to merge given ranges into a list of
141    /// non-overlapping ranges. Users may also specify a `gap` to merge
142    /// close ranges.
143    ///
144    /// The returning `Buffer` may share the same underlying memory without
145    /// any extra copy.
146    pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147        if ranges.is_empty() {
148            return Ok(vec![]);
149        }
150
151        let merged_ranges = self.merge_ranges(ranges.clone());
152
153        #[derive(Clone)]
154        struct FetchInput {
155            reader: Reader,
156            range: Range<u64>,
157        }
158
159        let mut tasks = ConcurrentTasks::new(
160            self.ctx.accessor().info().executor(),
161            self.ctx.options().concurrent(),
162            self.ctx.options().prefetch(),
163            |input: FetchInput| {
164                Box::pin(async move {
165                    let FetchInput { range, reader } = input.clone();
166                    (input, reader.read(range).await)
167                })
168            },
169        );
170
171        for range in merged_ranges.clone() {
172            let reader = self.clone();
173            tasks.execute(FetchInput { reader, range }).await?;
174        }
175
176        let mut merged_bufs = vec![];
177        while let Some(b) = tasks.next().await {
178            merged_bufs.push(b?);
179        }
180
181        let mut bufs = Vec::with_capacity(ranges.len());
182        for range in ranges {
183            let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
184            let start = range.start - merged_ranges[idx].start;
185            let end = range.end - merged_ranges[idx].start;
186            bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
187        }
188
189        Ok(bufs)
190    }
191
192    /// Merge given ranges into a list of non-overlapping ranges.
193    fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
194        let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
195        // We don't care about the order of range with same start, they
196        // will be merged in the next step.
197        ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
198
199        // We know that this vector will have at most element
200        let mut merged = Vec::with_capacity(ranges.len());
201        let mut cur = ranges[0].clone();
202
203        for range in ranges.into_iter().skip(1) {
204            if range.start <= cur.end + gap {
205                // There is an overlap or the gap is small enough to merge
206                cur.end = cur.end.max(range.end);
207            } else {
208                // No overlap and the gap is too large, push the current range to the list and start a new one
209                merged.push(cur);
210                cur = range;
211            }
212        }
213
214        // Push the last range
215        merged.push(cur);
216
217        merged
218    }
219
220    /// Create a buffer stream to read specific range from given reader.
221    ///
222    /// # Notes
223    ///
224    /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
225    /// It will return underlying [`Buffer`] directly.
226    ///
227    /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
228    ///
229    /// # Inputs
230    ///
231    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
232    ///
233    /// # Examples
234    ///
235    /// ## Basic Usage
236    ///
237    /// ```
238    /// use std::io;
239    ///
240    /// use bytes::Bytes;
241    /// use futures::TryStreamExt;
242    /// use opendal_core::Buffer;
243    /// use opendal_core::Operator;
244    /// use opendal_core::Result;
245    ///
246    /// async fn test(op: Operator) -> io::Result<()> {
247    ///     let mut s = op
248    ///         .reader("hello.txt")
249    ///         .await?
250    ///         .into_stream(1024..2048)
251    ///         .await?;
252    ///
253    ///     let bs: Vec<Buffer> = s.try_collect().await?;
254    ///     // We can use those buffer as bytes if we want.
255    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
256    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
257    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
258    ///
259    ///     Ok(())
260    /// }
261    /// ```
262    ///
263    /// ## Concurrent Read
264    ///
265    /// The following example reads data in 256B chunks with 8 concurrent.
266    ///
267    /// ```
268    /// use std::io;
269    ///
270    /// use bytes::Bytes;
271    /// use futures::TryStreamExt;
272    /// use opendal_core::Buffer;
273    /// use opendal_core::Operator;
274    /// use opendal_core::Result;
275    ///
276    /// async fn test(op: Operator) -> io::Result<()> {
277    ///     let s = op
278    ///         .reader_with("hello.txt")
279    ///         .concurrent(8)
280    ///         .chunk(256)
281    ///         .await?
282    ///         .into_stream(1024..2048)
283    ///         .await?;
284    ///
285    ///     // Every buffer except the last one in the stream will be 256B.
286    ///     let bs: Vec<Buffer> = s.try_collect().await?;
287    ///     // We can use those buffer as bytes if we want.
288    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
289    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
290    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
291    ///
292    ///     Ok(())
293    /// }
294    /// ```
295    pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
296        BufferStream::create(self.ctx, range).await
297    }
298
299    /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
300    /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
301    ///
302    /// # Notes
303    ///
304    /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
305    /// returns an owned [`Buffer`], which involves an extra copy operation.
306    ///
307    /// # Inputs
308    ///
309    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
310    ///
311    /// # Examples
312    ///
313    /// ## Basic Usage
314    ///
315    /// ```
316    /// use std::io;
317    ///
318    /// use futures::io::AsyncReadExt;
319    /// use opendal_core::Operator;
320    /// use opendal_core::Result;
321    ///
322    /// async fn test(op: Operator) -> io::Result<()> {
323    ///     let mut r = op
324    ///         .reader("hello.txt")
325    ///         .await?
326    ///         .into_futures_async_read(1024..2048)
327    ///         .await?;
328    ///     let mut bs = Vec::new();
329    ///     r.read_to_end(&mut bs).await?;
330    ///
331    ///     Ok(())
332    /// }
333    /// ```
334    ///
335    /// ## Concurrent Read
336    ///
337    /// The following example reads data in 256B chunks with 8 concurrent.
338    ///
339    /// ```
340    /// use std::io;
341    ///
342    /// use futures::io::AsyncReadExt;
343    /// use opendal_core::Operator;
344    /// use opendal_core::Result;
345    ///
346    /// async fn test(op: Operator) -> io::Result<()> {
347    ///     let mut r = op
348    ///         .reader_with("hello.txt")
349    ///         .concurrent(8)
350    ///         .chunk(256)
351    ///         .await?
352    ///         .into_futures_async_read(1024..2048)
353    ///         .await?;
354    ///     let mut bs = Vec::new();
355    ///     r.read_to_end(&mut bs).await?;
356    ///
357    ///     Ok(())
358    /// }
359    /// ```
360    #[inline]
361    pub async fn into_futures_async_read(
362        self,
363        range: impl RangeBounds<u64>,
364    ) -> Result<FuturesAsyncReader> {
365        let range = self.ctx.parse_into_range(range).await?;
366        Ok(FuturesAsyncReader::new(self.ctx, range))
367    }
368
369    /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
370    ///
371    /// # Inputs
372    ///
373    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
374    ///
375    /// # Examples
376    ///
377    /// ## Basic Usage
378    ///
379    /// ```
380    /// use std::io;
381    ///
382    /// use bytes::Bytes;
383    /// use futures::TryStreamExt;
384    /// use opendal_core::Operator;
385    /// use opendal_core::Result;
386    ///
387    /// async fn test(op: Operator) -> io::Result<()> {
388    ///     let mut s = op
389    ///         .reader("hello.txt")
390    ///         .await?
391    ///         .into_bytes_stream(1024..2048)
392    ///         .await?;
393    ///     let bs: Vec<Bytes> = s.try_collect().await?;
394    ///
395    ///     Ok(())
396    /// }
397    /// ```
398    ///
399    /// ## Concurrent Read
400    ///
401    /// The following example reads data in 256B chunks with 8 concurrent.
402    ///
403    /// ```
404    /// use std::io;
405    ///
406    /// use bytes::Bytes;
407    /// use futures::TryStreamExt;
408    /// use opendal_core::Operator;
409    /// use opendal_core::Result;
410    ///
411    /// async fn test(op: Operator) -> io::Result<()> {
412    ///     let mut s = op
413    ///         .reader_with("hello.txt")
414    ///         .concurrent(8)
415    ///         .chunk(256)
416    ///         .await?
417    ///         .into_bytes_stream(1024..2048)
418    ///         .await?;
419    ///     let bs: Vec<Bytes> = s.try_collect().await?;
420    ///
421    ///     Ok(())
422    /// }
423    /// ```
424    #[inline]
425    pub async fn into_bytes_stream(
426        self,
427        range: impl RangeBounds<u64>,
428    ) -> Result<FuturesBytesStream> {
429        FuturesBytesStream::new(self.ctx, range).await
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use bytes::Bytes;
436    use rand::Rng;
437    use rand::RngCore;
438    use rand::rngs::ThreadRng;
439
440    use super::*;
441    use crate::Operator;
442    use crate::raw::*;
443    use crate::services;
444
445    #[tokio::test]
446    async fn test_trait() -> Result<()> {
447        let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
448        op.write(
449            "test",
450            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
451        )
452        .await?;
453
454        let acc = op.into_inner();
455        let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
456
457        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
458
459        Ok(())
460    }
461
462    fn gen_random_bytes() -> Vec<u8> {
463        let mut rng = ThreadRng::default();
464        // Generate size between 1B..16MB.
465        let size = rng.gen_range(1..16 * 1024 * 1024);
466        let mut content = vec![0; size];
467        rng.fill_bytes(&mut content);
468        content
469    }
470
471    fn gen_fixed_bytes(size: usize) -> Vec<u8> {
472        let mut rng = ThreadRng::default();
473        let mut content = vec![0; size];
474        rng.fill_bytes(&mut content);
475        content
476    }
477
478    #[tokio::test]
479    async fn test_reader_read() -> Result<()> {
480        let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
481        let path = "test_file";
482
483        let content = gen_random_bytes();
484        op.write(path, content.clone())
485            .await
486            .expect("write must succeed");
487
488        let reader = op.reader(path).await.unwrap();
489        let buf = reader.read(..).await.expect("read to end must succeed");
490
491        assert_eq!(buf.to_bytes(), content);
492        Ok(())
493    }
494
495    #[tokio::test]
496    async fn test_reader_read_with_chunk() -> Result<()> {
497        let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
498        let path = "test_file";
499
500        let content = gen_random_bytes();
501        op.write(path, content.clone())
502            .await
503            .expect("write must succeed");
504
505        let reader = op.reader_with(path).chunk(16).await.unwrap();
506        let buf = reader.read(..).await.expect("read to end must succeed");
507
508        assert_eq!(buf.to_bytes(), content);
509        Ok(())
510    }
511
512    #[tokio::test]
513    async fn test_reader_read_with_concurrent() -> Result<()> {
514        let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
515        let path = "test_file";
516
517        let content = gen_random_bytes();
518        op.write(path, content.clone())
519            .await
520            .expect("write must succeed");
521
522        let reader = op
523            .reader_with(path)
524            .chunk(128)
525            .concurrent(16)
526            .await
527            .unwrap();
528        let buf = reader.read(..).await.expect("read to end must succeed");
529
530        assert_eq!(buf.to_bytes(), content);
531        Ok(())
532    }
533
534    #[tokio::test]
535    async fn test_reader_read_into() -> Result<()> {
536        let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
537        let path = "test_file";
538
539        let content = gen_random_bytes();
540        op.write(path, content.clone())
541            .await
542            .expect("write must succeed");
543
544        let reader = op.reader(path).await.unwrap();
545        let mut buf = Vec::new();
546        reader
547            .read_into(&mut buf, ..)
548            .await
549            .expect("read to end must succeed");
550
551        assert_eq!(buf, content);
552        Ok(())
553    }
554
555    #[tokio::test]
556    async fn test_merge_ranges() -> Result<()> {
557        let op = Operator::new(services::Memory::default()).unwrap().finish();
558        let path = "test_file";
559
560        let content = gen_random_bytes();
561        op.write(path, content.clone())
562            .await
563            .expect("write must succeed");
564
565        let reader = op.reader_with(path).gap(1).await.unwrap();
566
567        let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
568        let merged = reader.merge_ranges(ranges);
569        assert_eq!(merged, vec![0..30, 40..60]);
570        Ok(())
571    }
572
573    #[tokio::test]
574    async fn test_fetch() -> Result<()> {
575        let op = Operator::new(services::Memory::default()).unwrap().finish();
576        let path = "test_file";
577
578        let content = gen_fixed_bytes(1024);
579        op.write(path, content.clone())
580            .await
581            .expect("write must succeed");
582
583        let reader = op.reader_with(path).gap(1).await.unwrap();
584
585        let ranges = vec![
586            0..10,
587            40..50,
588            45..59,
589            10..20,
590            21..30,
591            40..50,
592            40..60,
593            45..59,
594        ];
595        let merged = reader
596            .fetch(ranges.clone())
597            .await
598            .expect("fetch must succeed");
599
600        for (i, range) in ranges.iter().enumerate() {
601            assert_eq!(
602                merged[i].to_bytes(),
603                content[range.start as usize..range.end as usize]
604            );
605        }
606        Ok(())
607    }
608
609    #[tokio::test]
610    async fn test_fetch_empty_ranges() -> Result<()> {
611        let op = Operator::new(services::Memory::default()).unwrap().finish();
612        let path = "test_file";
613
614        let content = gen_fixed_bytes(1024);
615        op.write(path, content.clone())
616            .await
617            .expect("write must succeed");
618
619        let reader = op.reader(path).await.unwrap();
620        let result = reader
621            .fetch(vec![])
622            .await
623            .expect("fetch with empty ranges must not panic");
624        assert!(result.is_empty());
625
626        Ok(())
627    }
628}