opendal/types/read/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::TryStreamExt;
24
25use crate::raw::Access;
26use crate::raw::ConcurrentTasks;
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47///     let r = op.reader("path/to/file").await?;
48///     let bs = r.read(0..1024).await?;
49///     Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let s = op
63///         .reader("path/to/file")
64///         .await?
65///         .into_bytes_stream(1024..2048)
66///         .await?;
67///     let bs: Vec<Bytes> = s.try_collect().await?;
68///     Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81///     let mut r = op
82///         .reader("path/to/file")
83///         .await?
84///         .into_futures_async_read(1024..2048)
85///         .await?;
86///     let mut bs = vec![];
87///     let n = r.read_to_end(&mut bs).await?;
88///     Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93    ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97    /// Create a new reader.
98    ///
99    /// Create will use internal information to decide the most suitable
100    /// implementation for users.
101    ///
102    /// We don't want to expose those details to users so keep this function
103    /// in crate only.
104    pub(crate) fn new(ctx: ReadContext) -> Self {
105        Reader { ctx: Arc::new(ctx) }
106    }
107
108    /// Read give range from reader into [`Buffer`].
109    ///
110    /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111    /// storage services without any extra copy or intensive memory allocations.
112    pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113        let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114        Ok(bufs.into_iter().flatten().collect())
115    }
116
117    /// Read all data from reader into given [`BufMut`].
118    ///
119    /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120    /// [`BufMut`] doesn't have enough space.
121    pub async fn read_into(
122        &self,
123        buf: &mut impl BufMut,
124        range: impl RangeBounds<u64>,
125    ) -> Result<usize> {
126        let mut stream = self.clone().into_stream(range).await?;
127
128        let mut read = 0;
129        loop {
130            let Some(bs) = stream.try_next().await? else {
131                return Ok(read);
132            };
133            read += bs.len();
134            buf.put(bs);
135        }
136    }
137
138    /// Fetch specific ranges from reader.
139    ///
140    /// This operation try to merge given ranges into a list of
141    /// non-overlapping ranges. Users may also specify a `gap` to merge
142    /// close ranges.
143    ///
144    /// The returning `Buffer` may share the same underlying memory without
145    /// any extra copy.
146    pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147        let merged_ranges = self.merge_ranges(ranges.clone());
148
149        #[derive(Clone)]
150        struct FetchInput {
151            reader: Reader,
152            range: Range<u64>,
153        }
154
155        let mut tasks = ConcurrentTasks::new(
156            self.ctx.accessor().info().executor(),
157            self.ctx.options().concurrent(),
158            |input: FetchInput| {
159                Box::pin(async move {
160                    let FetchInput { range, reader } = input.clone();
161                    (input, reader.read(range).await)
162                })
163            },
164        );
165
166        for range in merged_ranges.clone() {
167            let reader = self.clone();
168            tasks.execute(FetchInput { reader, range }).await?;
169        }
170
171        let mut merged_bufs = vec![];
172        while let Some(b) = tasks.next().await {
173            merged_bufs.push(b?);
174        }
175
176        let mut bufs = Vec::with_capacity(ranges.len());
177        for range in ranges {
178            let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
179            let start = range.start - merged_ranges[idx].start;
180            let end = range.end - merged_ranges[idx].start;
181            bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
182        }
183
184        Ok(bufs)
185    }
186
187    /// Merge given ranges into a list of non-overlapping ranges.
188    fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
189        let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
190        // We don't care about the order of range with same start, they
191        // will be merged in the next step.
192        ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
193
194        // We know that this vector will have at most element
195        let mut merged = Vec::with_capacity(ranges.len());
196        let mut cur = ranges[0].clone();
197
198        for range in ranges.into_iter().skip(1) {
199            if range.start <= cur.end + gap {
200                // There is an overlap or the gap is small enough to merge
201                cur.end = cur.end.max(range.end);
202            } else {
203                // No overlap and the gap is too large, push the current range to the list and start a new one
204                merged.push(cur);
205                cur = range;
206            }
207        }
208
209        // Push the last range
210        merged.push(cur);
211
212        merged
213    }
214
215    /// Create a buffer stream to read specific range from given reader.
216    ///
217    /// # Notes
218    ///
219    /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
220    /// It will return underlying [`Buffer`] directly.
221    ///
222    /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
223    ///
224    /// # Inputs
225    ///
226    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
227    ///
228    /// # Examples
229    ///
230    /// ## Basic Usage
231    ///
232    /// ```
233    /// use std::io;
234    ///
235    /// use bytes::Bytes;
236    /// use futures::TryStreamExt;
237    /// use opendal::Buffer;
238    /// use opendal::Operator;
239    /// use opendal::Result;
240    ///
241    /// async fn test(op: Operator) -> io::Result<()> {
242    ///     let mut s = op
243    ///         .reader("hello.txt")
244    ///         .await?
245    ///         .into_stream(1024..2048)
246    ///         .await?;
247    ///
248    ///     let bs: Vec<Buffer> = s.try_collect().await?;
249    ///     // We can use those buffer as bytes if we want.
250    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
251    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
252    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
253    ///
254    ///     Ok(())
255    /// }
256    /// ```
257    ///
258    /// ## Concurrent Read
259    ///
260    /// The following example reads data in 256B chunks with 8 concurrent.
261    ///
262    /// ```
263    /// use std::io;
264    ///
265    /// use bytes::Bytes;
266    /// use futures::TryStreamExt;
267    /// use opendal::Buffer;
268    /// use opendal::Operator;
269    /// use opendal::Result;
270    ///
271    /// async fn test(op: Operator) -> io::Result<()> {
272    ///     let s = op
273    ///         .reader_with("hello.txt")
274    ///         .concurrent(8)
275    ///         .chunk(256)
276    ///         .await?
277    ///         .into_stream(1024..2048)
278    ///         .await?;
279    ///
280    ///     // Every buffer except the last one in the stream will be 256B.
281    ///     let bs: Vec<Buffer> = s.try_collect().await?;
282    ///     // We can use those buffer as bytes if we want.
283    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
284    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
285    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
286    ///
287    ///     Ok(())
288    /// }
289    /// ```
290    pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
291        BufferStream::create(self.ctx, range).await
292    }
293
294    /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
295    /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
296    ///
297    /// # Notes
298    ///
299    /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
300    /// returns an owned [`Buffer`], which involves an extra copy operation.
301    ///
302    /// # Inputs
303    ///
304    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
305    ///
306    /// # Examples
307    ///
308    /// ## Basic Usage
309    ///
310    /// ```
311    /// use std::io;
312    ///
313    /// use futures::io::AsyncReadExt;
314    /// use opendal::Operator;
315    /// use opendal::Result;
316    ///
317    /// async fn test(op: Operator) -> io::Result<()> {
318    ///     let mut r = op
319    ///         .reader("hello.txt")
320    ///         .await?
321    ///         .into_futures_async_read(1024..2048)
322    ///         .await?;
323    ///     let mut bs = Vec::new();
324    ///     r.read_to_end(&mut bs).await?;
325    ///
326    ///     Ok(())
327    /// }
328    /// ```
329    ///
330    /// ## Concurrent Read
331    ///
332    /// The following example reads data in 256B chunks with 8 concurrent.
333    ///
334    /// ```
335    /// use std::io;
336    ///
337    /// use futures::io::AsyncReadExt;
338    /// use opendal::Operator;
339    /// use opendal::Result;
340    ///
341    /// async fn test(op: Operator) -> io::Result<()> {
342    ///     let mut r = op
343    ///         .reader_with("hello.txt")
344    ///         .concurrent(8)
345    ///         .chunk(256)
346    ///         .await?
347    ///         .into_futures_async_read(1024..2048)
348    ///         .await?;
349    ///     let mut bs = Vec::new();
350    ///     r.read_to_end(&mut bs).await?;
351    ///
352    ///     Ok(())
353    /// }
354    /// ```
355    #[inline]
356    pub async fn into_futures_async_read(
357        self,
358        range: impl RangeBounds<u64>,
359    ) -> Result<FuturesAsyncReader> {
360        let range = self.ctx.parse_into_range(range).await?;
361        Ok(FuturesAsyncReader::new(self.ctx, range))
362    }
363
364    /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
365    ///
366    /// # Inputs
367    ///
368    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
369    ///
370    /// # Examples
371    ///
372    /// ## Basic Usage
373    ///
374    /// ```
375    /// use std::io;
376    ///
377    /// use bytes::Bytes;
378    /// use futures::TryStreamExt;
379    /// use opendal::Operator;
380    /// use opendal::Result;
381    ///
382    /// async fn test(op: Operator) -> io::Result<()> {
383    ///     let mut s = op
384    ///         .reader("hello.txt")
385    ///         .await?
386    ///         .into_bytes_stream(1024..2048)
387    ///         .await?;
388    ///     let bs: Vec<Bytes> = s.try_collect().await?;
389    ///
390    ///     Ok(())
391    /// }
392    /// ```
393    ///
394    /// ## Concurrent Read
395    ///
396    /// The following example reads data in 256B chunks with 8 concurrent.
397    ///
398    /// ```
399    /// use std::io;
400    ///
401    /// use bytes::Bytes;
402    /// use futures::TryStreamExt;
403    /// use opendal::Operator;
404    /// use opendal::Result;
405    ///
406    /// async fn test(op: Operator) -> io::Result<()> {
407    ///     let mut s = op
408    ///         .reader_with("hello.txt")
409    ///         .concurrent(8)
410    ///         .chunk(256)
411    ///         .await?
412    ///         .into_bytes_stream(1024..2048)
413    ///         .await?;
414    ///     let bs: Vec<Bytes> = s.try_collect().await?;
415    ///
416    ///     Ok(())
417    /// }
418    /// ```
419    #[inline]
420    pub async fn into_bytes_stream(
421        self,
422        range: impl RangeBounds<u64>,
423    ) -> Result<FuturesBytesStream> {
424        FuturesBytesStream::new(self.ctx, range).await
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use bytes::Bytes;
431    use rand::rngs::ThreadRng;
432    use rand::Rng;
433    use rand::RngCore;
434
435    use super::*;
436    use crate::raw::*;
437    use crate::services;
438    use crate::Operator;
439
440    #[tokio::test]
441    async fn test_trait() -> Result<()> {
442        let op = Operator::via_iter(Scheme::Memory, [])?;
443        op.write(
444            "test",
445            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
446        )
447        .await?;
448
449        let acc = op.into_inner();
450        let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
451
452        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
453
454        Ok(())
455    }
456
457    fn gen_random_bytes() -> Vec<u8> {
458        let mut rng = ThreadRng::default();
459        // Generate size between 1B..16MB.
460        let size = rng.gen_range(1..16 * 1024 * 1024);
461        let mut content = vec![0; size];
462        rng.fill_bytes(&mut content);
463        content
464    }
465
466    fn gen_fixed_bytes(size: usize) -> Vec<u8> {
467        let mut rng = ThreadRng::default();
468        let mut content = vec![0; size];
469        rng.fill_bytes(&mut content);
470        content
471    }
472
473    #[tokio::test]
474    async fn test_reader_read() -> Result<()> {
475        let op = Operator::via_iter(Scheme::Memory, [])?;
476        let path = "test_file";
477
478        let content = gen_random_bytes();
479        op.write(path, content.clone())
480            .await
481            .expect("write must succeed");
482
483        let reader = op.reader(path).await.unwrap();
484        let buf = reader.read(..).await.expect("read to end must succeed");
485
486        assert_eq!(buf.to_bytes(), content);
487        Ok(())
488    }
489
490    #[tokio::test]
491    async fn test_reader_read_with_chunk() -> Result<()> {
492        let op = Operator::via_iter(Scheme::Memory, [])?;
493        let path = "test_file";
494
495        let content = gen_random_bytes();
496        op.write(path, content.clone())
497            .await
498            .expect("write must succeed");
499
500        let reader = op.reader_with(path).chunk(16).await.unwrap();
501        let buf = reader.read(..).await.expect("read to end must succeed");
502
503        assert_eq!(buf.to_bytes(), content);
504        Ok(())
505    }
506
507    #[tokio::test]
508    async fn test_reader_read_with_concurrent() -> Result<()> {
509        let op = Operator::via_iter(Scheme::Memory, [])?;
510        let path = "test_file";
511
512        let content = gen_random_bytes();
513        op.write(path, content.clone())
514            .await
515            .expect("write must succeed");
516
517        let reader = op
518            .reader_with(path)
519            .chunk(128)
520            .concurrent(16)
521            .await
522            .unwrap();
523        let buf = reader.read(..).await.expect("read to end must succeed");
524
525        assert_eq!(buf.to_bytes(), content);
526        Ok(())
527    }
528
529    #[tokio::test]
530    async fn test_reader_read_into() -> Result<()> {
531        let op = Operator::via_iter(Scheme::Memory, [])?;
532        let path = "test_file";
533
534        let content = gen_random_bytes();
535        op.write(path, content.clone())
536            .await
537            .expect("write must succeed");
538
539        let reader = op.reader(path).await.unwrap();
540        let mut buf = Vec::new();
541        reader
542            .read_into(&mut buf, ..)
543            .await
544            .expect("read to end must succeed");
545
546        assert_eq!(buf, content);
547        Ok(())
548    }
549
550    #[tokio::test]
551    async fn test_merge_ranges() -> Result<()> {
552        let op = Operator::new(services::Memory::default()).unwrap().finish();
553        let path = "test_file";
554
555        let content = gen_random_bytes();
556        op.write(path, content.clone())
557            .await
558            .expect("write must succeed");
559
560        let reader = op.reader_with(path).gap(1).await.unwrap();
561
562        let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
563        let merged = reader.merge_ranges(ranges.clone());
564        assert_eq!(merged, vec![0..30, 40..60]);
565        Ok(())
566    }
567
568    #[tokio::test]
569    async fn test_fetch() -> Result<()> {
570        let op = Operator::new(services::Memory::default()).unwrap().finish();
571        let path = "test_file";
572
573        let content = gen_fixed_bytes(1024);
574        op.write(path, content.clone())
575            .await
576            .expect("write must succeed");
577
578        let reader = op.reader_with(path).gap(1).await.unwrap();
579
580        let ranges = vec![
581            0..10,
582            40..50,
583            45..59,
584            10..20,
585            21..30,
586            40..50,
587            40..60,
588            45..59,
589        ];
590        let merged = reader
591            .fetch(ranges.clone())
592            .await
593            .expect("fetch must succeed");
594
595        for (i, range) in ranges.iter().enumerate() {
596            assert_eq!(
597                merged[i].to_bytes(),
598                content[range.start as usize..range.end as usize]
599            );
600        }
601        Ok(())
602    }
603}