opendal/types/read/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::TryStreamExt;
24
25use crate::raw::Access;
26use crate::raw::ConcurrentTasks;
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47///     let r = op.reader("path/to/file").await?;
48///     let bs = r.read(0..1024).await?;
49///     Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let s = op
63///         .reader("path/to/file")
64///         .await?
65///         .into_bytes_stream(1024..2048)
66///         .await?;
67///     let bs: Vec<Bytes> = s.try_collect().await?;
68///     Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81///     let mut r = op
82///         .reader("path/to/file")
83///         .await?
84///         .into_futures_async_read(1024..2048)
85///         .await?;
86///     let mut bs = vec![];
87///     let n = r.read_to_end(&mut bs).await?;
88///     Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93    ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97    /// Create a new reader.
98    ///
99    /// Create will use internal information to decide the most suitable
100    /// implementation for users.
101    ///
102    /// We don't want to expose those details to users so keep this function
103    /// in crate only.
104    pub(crate) fn new(ctx: ReadContext) -> Self {
105        Reader { ctx: Arc::new(ctx) }
106    }
107
108    /// Read give range from reader into [`Buffer`].
109    ///
110    /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111    /// storage services without any extra copy or intensive memory allocations.
112    pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113        let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114        Ok(bufs.into_iter().flatten().collect())
115    }
116
117    /// Read all data from reader into given [`BufMut`].
118    ///
119    /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120    /// [`BufMut`] doesn't have enough space.
121    pub async fn read_into(
122        &self,
123        buf: &mut impl BufMut,
124        range: impl RangeBounds<u64>,
125    ) -> Result<usize> {
126        let mut stream = self.clone().into_stream(range).await?;
127
128        let mut read = 0;
129        loop {
130            let Some(bs) = stream.try_next().await? else {
131                return Ok(read);
132            };
133            read += bs.len();
134            buf.put(bs);
135        }
136    }
137
138    /// Fetch specific ranges from reader.
139    ///
140    /// This operation try to merge given ranges into a list of
141    /// non-overlapping ranges. Users may also specify a `gap` to merge
142    /// close ranges.
143    ///
144    /// The returning `Buffer` may share the same underlying memory without
145    /// any extra copy.
146    pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147        let merged_ranges = self.merge_ranges(ranges.clone());
148
149        #[derive(Clone)]
150        struct FetchInput {
151            reader: Reader,
152            range: Range<u64>,
153        }
154
155        let mut tasks = ConcurrentTasks::new(
156            self.ctx.accessor().info().executor(),
157            self.ctx.options().concurrent(),
158            self.ctx.options().prefetch(),
159            |input: FetchInput| {
160                Box::pin(async move {
161                    let FetchInput { range, reader } = input.clone();
162                    (input, reader.read(range).await)
163                })
164            },
165        );
166
167        for range in merged_ranges.clone() {
168            let reader = self.clone();
169            tasks.execute(FetchInput { reader, range }).await?;
170        }
171
172        let mut merged_bufs = vec![];
173        while let Some(b) = tasks.next().await {
174            merged_bufs.push(b?);
175        }
176
177        let mut bufs = Vec::with_capacity(ranges.len());
178        for range in ranges {
179            let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
180            let start = range.start - merged_ranges[idx].start;
181            let end = range.end - merged_ranges[idx].start;
182            bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
183        }
184
185        Ok(bufs)
186    }
187
188    /// Merge given ranges into a list of non-overlapping ranges.
189    fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
190        let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
191        // We don't care about the order of range with same start, they
192        // will be merged in the next step.
193        ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
194
195        // We know that this vector will have at most element
196        let mut merged = Vec::with_capacity(ranges.len());
197        let mut cur = ranges[0].clone();
198
199        for range in ranges.into_iter().skip(1) {
200            if range.start <= cur.end + gap {
201                // There is an overlap or the gap is small enough to merge
202                cur.end = cur.end.max(range.end);
203            } else {
204                // No overlap and the gap is too large, push the current range to the list and start a new one
205                merged.push(cur);
206                cur = range;
207            }
208        }
209
210        // Push the last range
211        merged.push(cur);
212
213        merged
214    }
215
216    /// Create a buffer stream to read specific range from given reader.
217    ///
218    /// # Notes
219    ///
220    /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
221    /// It will return underlying [`Buffer`] directly.
222    ///
223    /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
224    ///
225    /// # Inputs
226    ///
227    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
228    ///
229    /// # Examples
230    ///
231    /// ## Basic Usage
232    ///
233    /// ```
234    /// use std::io;
235    ///
236    /// use bytes::Bytes;
237    /// use futures::TryStreamExt;
238    /// use opendal::Buffer;
239    /// use opendal::Operator;
240    /// use opendal::Result;
241    ///
242    /// async fn test(op: Operator) -> io::Result<()> {
243    ///     let mut s = op
244    ///         .reader("hello.txt")
245    ///         .await?
246    ///         .into_stream(1024..2048)
247    ///         .await?;
248    ///
249    ///     let bs: Vec<Buffer> = s.try_collect().await?;
250    ///     // We can use those buffer as bytes if we want.
251    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
252    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
253    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
254    ///
255    ///     Ok(())
256    /// }
257    /// ```
258    ///
259    /// ## Concurrent Read
260    ///
261    /// The following example reads data in 256B chunks with 8 concurrent.
262    ///
263    /// ```
264    /// use std::io;
265    ///
266    /// use bytes::Bytes;
267    /// use futures::TryStreamExt;
268    /// use opendal::Buffer;
269    /// use opendal::Operator;
270    /// use opendal::Result;
271    ///
272    /// async fn test(op: Operator) -> io::Result<()> {
273    ///     let s = op
274    ///         .reader_with("hello.txt")
275    ///         .concurrent(8)
276    ///         .chunk(256)
277    ///         .await?
278    ///         .into_stream(1024..2048)
279    ///         .await?;
280    ///
281    ///     // Every buffer except the last one in the stream will be 256B.
282    ///     let bs: Vec<Buffer> = s.try_collect().await?;
283    ///     // We can use those buffer as bytes if we want.
284    ///     let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
285    ///     // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
286    ///     let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
287    ///
288    ///     Ok(())
289    /// }
290    /// ```
291    pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
292        BufferStream::create(self.ctx, range).await
293    }
294
295    /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
296    /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
297    ///
298    /// # Notes
299    ///
300    /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
301    /// returns an owned [`Buffer`], which involves an extra copy operation.
302    ///
303    /// # Inputs
304    ///
305    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
306    ///
307    /// # Examples
308    ///
309    /// ## Basic Usage
310    ///
311    /// ```
312    /// use std::io;
313    ///
314    /// use futures::io::AsyncReadExt;
315    /// use opendal::Operator;
316    /// use opendal::Result;
317    ///
318    /// async fn test(op: Operator) -> io::Result<()> {
319    ///     let mut r = op
320    ///         .reader("hello.txt")
321    ///         .await?
322    ///         .into_futures_async_read(1024..2048)
323    ///         .await?;
324    ///     let mut bs = Vec::new();
325    ///     r.read_to_end(&mut bs).await?;
326    ///
327    ///     Ok(())
328    /// }
329    /// ```
330    ///
331    /// ## Concurrent Read
332    ///
333    /// The following example reads data in 256B chunks with 8 concurrent.
334    ///
335    /// ```
336    /// use std::io;
337    ///
338    /// use futures::io::AsyncReadExt;
339    /// use opendal::Operator;
340    /// use opendal::Result;
341    ///
342    /// async fn test(op: Operator) -> io::Result<()> {
343    ///     let mut r = op
344    ///         .reader_with("hello.txt")
345    ///         .concurrent(8)
346    ///         .chunk(256)
347    ///         .await?
348    ///         .into_futures_async_read(1024..2048)
349    ///         .await?;
350    ///     let mut bs = Vec::new();
351    ///     r.read_to_end(&mut bs).await?;
352    ///
353    ///     Ok(())
354    /// }
355    /// ```
356    #[inline]
357    pub async fn into_futures_async_read(
358        self,
359        range: impl RangeBounds<u64>,
360    ) -> Result<FuturesAsyncReader> {
361        let range = self.ctx.parse_into_range(range).await?;
362        Ok(FuturesAsyncReader::new(self.ctx, range))
363    }
364
365    /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
366    ///
367    /// # Inputs
368    ///
369    /// - `range`: The range of data to read. range like `..` it will read all data from reader.
370    ///
371    /// # Examples
372    ///
373    /// ## Basic Usage
374    ///
375    /// ```
376    /// use std::io;
377    ///
378    /// use bytes::Bytes;
379    /// use futures::TryStreamExt;
380    /// use opendal::Operator;
381    /// use opendal::Result;
382    ///
383    /// async fn test(op: Operator) -> io::Result<()> {
384    ///     let mut s = op
385    ///         .reader("hello.txt")
386    ///         .await?
387    ///         .into_bytes_stream(1024..2048)
388    ///         .await?;
389    ///     let bs: Vec<Bytes> = s.try_collect().await?;
390    ///
391    ///     Ok(())
392    /// }
393    /// ```
394    ///
395    /// ## Concurrent Read
396    ///
397    /// The following example reads data in 256B chunks with 8 concurrent.
398    ///
399    /// ```
400    /// use std::io;
401    ///
402    /// use bytes::Bytes;
403    /// use futures::TryStreamExt;
404    /// use opendal::Operator;
405    /// use opendal::Result;
406    ///
407    /// async fn test(op: Operator) -> io::Result<()> {
408    ///     let mut s = op
409    ///         .reader_with("hello.txt")
410    ///         .concurrent(8)
411    ///         .chunk(256)
412    ///         .await?
413    ///         .into_bytes_stream(1024..2048)
414    ///         .await?;
415    ///     let bs: Vec<Bytes> = s.try_collect().await?;
416    ///
417    ///     Ok(())
418    /// }
419    /// ```
420    #[inline]
421    pub async fn into_bytes_stream(
422        self,
423        range: impl RangeBounds<u64>,
424    ) -> Result<FuturesBytesStream> {
425        FuturesBytesStream::new(self.ctx, range).await
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use bytes::Bytes;
432    use rand::rngs::ThreadRng;
433    use rand::Rng;
434    use rand::RngCore;
435
436    use super::*;
437    use crate::raw::*;
438    use crate::services;
439    use crate::Operator;
440
441    #[tokio::test]
442    async fn test_trait() -> Result<()> {
443        let op = Operator::via_iter(Scheme::Memory, [])?;
444        op.write(
445            "test",
446            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
447        )
448        .await?;
449
450        let acc = op.into_inner();
451        let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
452
453        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
454
455        Ok(())
456    }
457
458    fn gen_random_bytes() -> Vec<u8> {
459        let mut rng = ThreadRng::default();
460        // Generate size between 1B..16MB.
461        let size = rng.gen_range(1..16 * 1024 * 1024);
462        let mut content = vec![0; size];
463        rng.fill_bytes(&mut content);
464        content
465    }
466
467    fn gen_fixed_bytes(size: usize) -> Vec<u8> {
468        let mut rng = ThreadRng::default();
469        let mut content = vec![0; size];
470        rng.fill_bytes(&mut content);
471        content
472    }
473
474    #[tokio::test]
475    async fn test_reader_read() -> Result<()> {
476        let op = Operator::via_iter(Scheme::Memory, [])?;
477        let path = "test_file";
478
479        let content = gen_random_bytes();
480        op.write(path, content.clone())
481            .await
482            .expect("write must succeed");
483
484        let reader = op.reader(path).await.unwrap();
485        let buf = reader.read(..).await.expect("read to end must succeed");
486
487        assert_eq!(buf.to_bytes(), content);
488        Ok(())
489    }
490
491    #[tokio::test]
492    async fn test_reader_read_with_chunk() -> Result<()> {
493        let op = Operator::via_iter(Scheme::Memory, [])?;
494        let path = "test_file";
495
496        let content = gen_random_bytes();
497        op.write(path, content.clone())
498            .await
499            .expect("write must succeed");
500
501        let reader = op.reader_with(path).chunk(16).await.unwrap();
502        let buf = reader.read(..).await.expect("read to end must succeed");
503
504        assert_eq!(buf.to_bytes(), content);
505        Ok(())
506    }
507
508    #[tokio::test]
509    async fn test_reader_read_with_concurrent() -> Result<()> {
510        let op = Operator::via_iter(Scheme::Memory, [])?;
511        let path = "test_file";
512
513        let content = gen_random_bytes();
514        op.write(path, content.clone())
515            .await
516            .expect("write must succeed");
517
518        let reader = op
519            .reader_with(path)
520            .chunk(128)
521            .concurrent(16)
522            .await
523            .unwrap();
524        let buf = reader.read(..).await.expect("read to end must succeed");
525
526        assert_eq!(buf.to_bytes(), content);
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn test_reader_read_into() -> Result<()> {
532        let op = Operator::via_iter(Scheme::Memory, [])?;
533        let path = "test_file";
534
535        let content = gen_random_bytes();
536        op.write(path, content.clone())
537            .await
538            .expect("write must succeed");
539
540        let reader = op.reader(path).await.unwrap();
541        let mut buf = Vec::new();
542        reader
543            .read_into(&mut buf, ..)
544            .await
545            .expect("read to end must succeed");
546
547        assert_eq!(buf, content);
548        Ok(())
549    }
550
551    #[tokio::test]
552    async fn test_merge_ranges() -> Result<()> {
553        let op = Operator::new(services::Memory::default()).unwrap().finish();
554        let path = "test_file";
555
556        let content = gen_random_bytes();
557        op.write(path, content.clone())
558            .await
559            .expect("write must succeed");
560
561        let reader = op.reader_with(path).gap(1).await.unwrap();
562
563        let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
564        let merged = reader.merge_ranges(ranges.clone());
565        assert_eq!(merged, vec![0..30, 40..60]);
566        Ok(())
567    }
568
569    #[tokio::test]
570    async fn test_fetch() -> Result<()> {
571        let op = Operator::new(services::Memory::default()).unwrap().finish();
572        let path = "test_file";
573
574        let content = gen_fixed_bytes(1024);
575        op.write(path, content.clone())
576            .await
577            .expect("write must succeed");
578
579        let reader = op.reader_with(path).gap(1).await.unwrap();
580
581        let ranges = vec![
582            0..10,
583            40..50,
584            45..59,
585            10..20,
586            21..30,
587            40..50,
588            40..60,
589            45..59,
590        ];
591        let merged = reader
592            .fetch(ranges.clone())
593            .await
594            .expect("fetch must succeed");
595
596        for (i, range) in ranges.iter().enumerate() {
597            assert_eq!(
598                merged[i].to_bytes(),
599                content[range.start as usize..range.end as usize]
600            );
601        }
602        Ok(())
603    }
604}