opendal_core/types/read/reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::TryStreamExt;
24
25use crate::raw::Access;
26use crate::raw::ConcurrentTasks;
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal_core::Operator;
44/// use opendal_core::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47/// let r = op.reader("path/to/file").await?;
48/// let bs = r.read(0..1024).await?;
49/// Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal_core::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62/// let s = op
63/// .reader("path/to/file")
64/// .await?
65/// .into_bytes_stream(1024..2048)
66/// .await?;
67/// let bs: Vec<Bytes> = s.try_collect().await?;
68/// Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal_core::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81/// let mut r = op
82/// .reader("path/to/file")
83/// .await?
84/// .into_futures_async_read(1024..2048)
85/// .await?;
86/// let mut bs = vec![];
87/// let n = r.read_to_end(&mut bs).await?;
88/// Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93 ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97 /// Create a new reader.
98 ///
99 /// Create will use internal information to decide the most suitable
100 /// implementation for users.
101 ///
102 /// We don't want to expose those details to users so keep this function
103 /// in crate only.
104 pub(crate) fn new(ctx: ReadContext) -> Self {
105 Reader { ctx: Arc::new(ctx) }
106 }
107
108 /// Read give range from reader into [`Buffer`].
109 ///
110 /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111 /// storage services without any extra copy or intensive memory allocations.
112 pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113 let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114 Ok(bufs.into_iter().flatten().collect())
115 }
116
117 /// Read all data from reader into given [`BufMut`].
118 ///
119 /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120 /// [`BufMut`] doesn't have enough space.
121 pub async fn read_into(
122 &self,
123 buf: &mut impl BufMut,
124 range: impl RangeBounds<u64>,
125 ) -> Result<usize> {
126 let mut stream = self.clone().into_stream(range).await?;
127
128 let mut read = 0;
129 loop {
130 let Some(bs) = stream.try_next().await? else {
131 return Ok(read);
132 };
133 read += bs.len();
134 buf.put(bs);
135 }
136 }
137
138 /// Fetch specific ranges from reader.
139 ///
140 /// This operation try to merge given ranges into a list of
141 /// non-overlapping ranges. Users may also specify a `gap` to merge
142 /// close ranges.
143 ///
144 /// The returning `Buffer` may share the same underlying memory without
145 /// any extra copy.
146 pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147 if ranges.is_empty() {
148 return Ok(vec![]);
149 }
150
151 let merged_ranges = self.merge_ranges(ranges.clone());
152
153 #[derive(Clone)]
154 struct FetchInput {
155 reader: Reader,
156 range: Range<u64>,
157 }
158
159 let mut tasks = ConcurrentTasks::new(
160 self.ctx.accessor().info().executor(),
161 self.ctx.options().concurrent(),
162 self.ctx.options().prefetch(),
163 |input: FetchInput| {
164 Box::pin(async move {
165 let FetchInput { range, reader } = input.clone();
166 (input, reader.read(range).await)
167 })
168 },
169 );
170
171 for range in merged_ranges.clone() {
172 let reader = self.clone();
173 tasks.execute(FetchInput { reader, range }).await?;
174 }
175
176 let mut merged_bufs = vec![];
177 while let Some(b) = tasks.next().await {
178 merged_bufs.push(b?);
179 }
180
181 let mut bufs = Vec::with_capacity(ranges.len());
182 for range in ranges {
183 let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
184 let start = range.start - merged_ranges[idx].start;
185 let end = range.end - merged_ranges[idx].start;
186 bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
187 }
188
189 Ok(bufs)
190 }
191
192 /// Merge given ranges into a list of non-overlapping ranges.
193 fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
194 let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
195 // We don't care about the order of range with same start, they
196 // will be merged in the next step.
197 ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
198
199 // We know that this vector will have at most element
200 let mut merged = Vec::with_capacity(ranges.len());
201 let mut cur = ranges[0].clone();
202
203 for range in ranges.into_iter().skip(1) {
204 if range.start <= cur.end + gap {
205 // There is an overlap or the gap is small enough to merge
206 cur.end = cur.end.max(range.end);
207 } else {
208 // No overlap and the gap is too large, push the current range to the list and start a new one
209 merged.push(cur);
210 cur = range;
211 }
212 }
213
214 // Push the last range
215 merged.push(cur);
216
217 merged
218 }
219
220 /// Create a buffer stream to read specific range from given reader.
221 ///
222 /// # Notes
223 ///
224 /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
225 /// It will return underlying [`Buffer`] directly.
226 ///
227 /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
228 ///
229 /// # Inputs
230 ///
231 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
232 ///
233 /// # Examples
234 ///
235 /// ## Basic Usage
236 ///
237 /// ```
238 /// use std::io;
239 ///
240 /// use bytes::Bytes;
241 /// use futures::TryStreamExt;
242 /// use opendal_core::Buffer;
243 /// use opendal_core::Operator;
244 /// use opendal_core::Result;
245 ///
246 /// async fn test(op: Operator) -> io::Result<()> {
247 /// let mut s = op
248 /// .reader("hello.txt")
249 /// .await?
250 /// .into_stream(1024..2048)
251 /// .await?;
252 ///
253 /// let bs: Vec<Buffer> = s.try_collect().await?;
254 /// // We can use those buffer as bytes if we want.
255 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
256 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
257 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
258 ///
259 /// Ok(())
260 /// }
261 /// ```
262 ///
263 /// ## Concurrent Read
264 ///
265 /// The following example reads data in 256B chunks with 8 concurrent.
266 ///
267 /// ```
268 /// use std::io;
269 ///
270 /// use bytes::Bytes;
271 /// use futures::TryStreamExt;
272 /// use opendal_core::Buffer;
273 /// use opendal_core::Operator;
274 /// use opendal_core::Result;
275 ///
276 /// async fn test(op: Operator) -> io::Result<()> {
277 /// let s = op
278 /// .reader_with("hello.txt")
279 /// .concurrent(8)
280 /// .chunk(256)
281 /// .await?
282 /// .into_stream(1024..2048)
283 /// .await?;
284 ///
285 /// // Every buffer except the last one in the stream will be 256B.
286 /// let bs: Vec<Buffer> = s.try_collect().await?;
287 /// // We can use those buffer as bytes if we want.
288 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
289 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
290 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
291 ///
292 /// Ok(())
293 /// }
294 /// ```
295 pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
296 BufferStream::create(self.ctx, range).await
297 }
298
299 /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
300 /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
301 ///
302 /// # Notes
303 ///
304 /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
305 /// returns an owned [`Buffer`], which involves an extra copy operation.
306 ///
307 /// # Inputs
308 ///
309 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
310 ///
311 /// # Examples
312 ///
313 /// ## Basic Usage
314 ///
315 /// ```
316 /// use std::io;
317 ///
318 /// use futures::io::AsyncReadExt;
319 /// use opendal_core::Operator;
320 /// use opendal_core::Result;
321 ///
322 /// async fn test(op: Operator) -> io::Result<()> {
323 /// let mut r = op
324 /// .reader("hello.txt")
325 /// .await?
326 /// .into_futures_async_read(1024..2048)
327 /// .await?;
328 /// let mut bs = Vec::new();
329 /// r.read_to_end(&mut bs).await?;
330 ///
331 /// Ok(())
332 /// }
333 /// ```
334 ///
335 /// ## Concurrent Read
336 ///
337 /// The following example reads data in 256B chunks with 8 concurrent.
338 ///
339 /// ```
340 /// use std::io;
341 ///
342 /// use futures::io::AsyncReadExt;
343 /// use opendal_core::Operator;
344 /// use opendal_core::Result;
345 ///
346 /// async fn test(op: Operator) -> io::Result<()> {
347 /// let mut r = op
348 /// .reader_with("hello.txt")
349 /// .concurrent(8)
350 /// .chunk(256)
351 /// .await?
352 /// .into_futures_async_read(1024..2048)
353 /// .await?;
354 /// let mut bs = Vec::new();
355 /// r.read_to_end(&mut bs).await?;
356 ///
357 /// Ok(())
358 /// }
359 /// ```
360 #[inline]
361 pub async fn into_futures_async_read(
362 self,
363 range: impl RangeBounds<u64>,
364 ) -> Result<FuturesAsyncReader> {
365 let range = self.ctx.parse_into_range(range).await?;
366 Ok(FuturesAsyncReader::new(self.ctx, range))
367 }
368
369 /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
370 ///
371 /// # Inputs
372 ///
373 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
374 ///
375 /// # Examples
376 ///
377 /// ## Basic Usage
378 ///
379 /// ```
380 /// use std::io;
381 ///
382 /// use bytes::Bytes;
383 /// use futures::TryStreamExt;
384 /// use opendal_core::Operator;
385 /// use opendal_core::Result;
386 ///
387 /// async fn test(op: Operator) -> io::Result<()> {
388 /// let mut s = op
389 /// .reader("hello.txt")
390 /// .await?
391 /// .into_bytes_stream(1024..2048)
392 /// .await?;
393 /// let bs: Vec<Bytes> = s.try_collect().await?;
394 ///
395 /// Ok(())
396 /// }
397 /// ```
398 ///
399 /// ## Concurrent Read
400 ///
401 /// The following example reads data in 256B chunks with 8 concurrent.
402 ///
403 /// ```
404 /// use std::io;
405 ///
406 /// use bytes::Bytes;
407 /// use futures::TryStreamExt;
408 /// use opendal_core::Operator;
409 /// use opendal_core::Result;
410 ///
411 /// async fn test(op: Operator) -> io::Result<()> {
412 /// let mut s = op
413 /// .reader_with("hello.txt")
414 /// .concurrent(8)
415 /// .chunk(256)
416 /// .await?
417 /// .into_bytes_stream(1024..2048)
418 /// .await?;
419 /// let bs: Vec<Bytes> = s.try_collect().await?;
420 ///
421 /// Ok(())
422 /// }
423 /// ```
424 #[inline]
425 pub async fn into_bytes_stream(
426 self,
427 range: impl RangeBounds<u64>,
428 ) -> Result<FuturesBytesStream> {
429 FuturesBytesStream::new(self.ctx, range).await
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use bytes::Bytes;
436 use rand::Rng;
437 use rand::RngCore;
438 use rand::rngs::ThreadRng;
439
440 use super::*;
441 use crate::Operator;
442 use crate::raw::*;
443 use crate::services;
444
445 #[tokio::test]
446 async fn test_trait() -> Result<()> {
447 let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
448 op.write(
449 "test",
450 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
451 )
452 .await?;
453
454 let acc = op.into_inner();
455 let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
456
457 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
458
459 Ok(())
460 }
461
462 fn gen_random_bytes() -> Vec<u8> {
463 let mut rng = ThreadRng::default();
464 // Generate size between 1B..16MB.
465 let size = rng.gen_range(1..16 * 1024 * 1024);
466 let mut content = vec![0; size];
467 rng.fill_bytes(&mut content);
468 content
469 }
470
471 fn gen_fixed_bytes(size: usize) -> Vec<u8> {
472 let mut rng = ThreadRng::default();
473 let mut content = vec![0; size];
474 rng.fill_bytes(&mut content);
475 content
476 }
477
478 #[tokio::test]
479 async fn test_reader_read() -> Result<()> {
480 let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
481 let path = "test_file";
482
483 let content = gen_random_bytes();
484 op.write(path, content.clone())
485 .await
486 .expect("write must succeed");
487
488 let reader = op.reader(path).await.unwrap();
489 let buf = reader.read(..).await.expect("read to end must succeed");
490
491 assert_eq!(buf.to_bytes(), content);
492 Ok(())
493 }
494
495 #[tokio::test]
496 async fn test_reader_read_with_chunk() -> Result<()> {
497 let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
498 let path = "test_file";
499
500 let content = gen_random_bytes();
501 op.write(path, content.clone())
502 .await
503 .expect("write must succeed");
504
505 let reader = op.reader_with(path).chunk(16).await.unwrap();
506 let buf = reader.read(..).await.expect("read to end must succeed");
507
508 assert_eq!(buf.to_bytes(), content);
509 Ok(())
510 }
511
512 #[tokio::test]
513 async fn test_reader_read_with_concurrent() -> Result<()> {
514 let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
515 let path = "test_file";
516
517 let content = gen_random_bytes();
518 op.write(path, content.clone())
519 .await
520 .expect("write must succeed");
521
522 let reader = op
523 .reader_with(path)
524 .chunk(128)
525 .concurrent(16)
526 .await
527 .unwrap();
528 let buf = reader.read(..).await.expect("read to end must succeed");
529
530 assert_eq!(buf.to_bytes(), content);
531 Ok(())
532 }
533
534 #[tokio::test]
535 async fn test_reader_read_into() -> Result<()> {
536 let op = Operator::via_iter(services::MEMORY_SCHEME, [])?;
537 let path = "test_file";
538
539 let content = gen_random_bytes();
540 op.write(path, content.clone())
541 .await
542 .expect("write must succeed");
543
544 let reader = op.reader(path).await.unwrap();
545 let mut buf = Vec::new();
546 reader
547 .read_into(&mut buf, ..)
548 .await
549 .expect("read to end must succeed");
550
551 assert_eq!(buf, content);
552 Ok(())
553 }
554
555 #[tokio::test]
556 async fn test_merge_ranges() -> Result<()> {
557 let op = Operator::new(services::Memory::default()).unwrap().finish();
558 let path = "test_file";
559
560 let content = gen_random_bytes();
561 op.write(path, content.clone())
562 .await
563 .expect("write must succeed");
564
565 let reader = op.reader_with(path).gap(1).await.unwrap();
566
567 let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
568 let merged = reader.merge_ranges(ranges);
569 assert_eq!(merged, vec![0..30, 40..60]);
570 Ok(())
571 }
572
573 #[tokio::test]
574 async fn test_fetch() -> Result<()> {
575 let op = Operator::new(services::Memory::default()).unwrap().finish();
576 let path = "test_file";
577
578 let content = gen_fixed_bytes(1024);
579 op.write(path, content.clone())
580 .await
581 .expect("write must succeed");
582
583 let reader = op.reader_with(path).gap(1).await.unwrap();
584
585 let ranges = vec![
586 0..10,
587 40..50,
588 45..59,
589 10..20,
590 21..30,
591 40..50,
592 40..60,
593 45..59,
594 ];
595 let merged = reader
596 .fetch(ranges.clone())
597 .await
598 .expect("fetch must succeed");
599
600 for (i, range) in ranges.iter().enumerate() {
601 assert_eq!(
602 merged[i].to_bytes(),
603 content[range.start as usize..range.end as usize]
604 );
605 }
606 Ok(())
607 }
608
609 #[tokio::test]
610 async fn test_fetch_empty_ranges() -> Result<()> {
611 let op = Operator::new(services::Memory::default()).unwrap().finish();
612 let path = "test_file";
613
614 let content = gen_fixed_bytes(1024);
615 op.write(path, content.clone())
616 .await
617 .expect("write must succeed");
618
619 let reader = op.reader(path).await.unwrap();
620 let result = reader
621 .fetch(vec![])
622 .await
623 .expect("fetch with empty ranges must not panic");
624 assert!(result.is_empty());
625
626 Ok(())
627 }
628}