opendal/types/read/reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::TryStreamExt;
24
25use crate::raw::Access;
26use crate::raw::ConcurrentTasks;
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47/// let r = op.reader("path/to/file").await?;
48/// let bs = r.read(0..1024).await?;
49/// Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62/// let s = op
63/// .reader("path/to/file")
64/// .await?
65/// .into_bytes_stream(1024..2048)
66/// .await?;
67/// let bs: Vec<Bytes> = s.try_collect().await?;
68/// Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81/// let mut r = op
82/// .reader("path/to/file")
83/// .await?
84/// .into_futures_async_read(1024..2048)
85/// .await?;
86/// let mut bs = vec![];
87/// let n = r.read_to_end(&mut bs).await?;
88/// Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93 ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97 /// Create a new reader.
98 ///
99 /// Create will use internal information to decide the most suitable
100 /// implementation for users.
101 ///
102 /// We don't want to expose those details to users so keep this function
103 /// in crate only.
104 pub(crate) fn new(ctx: ReadContext) -> Self {
105 Reader { ctx: Arc::new(ctx) }
106 }
107
108 /// Read give range from reader into [`Buffer`].
109 ///
110 /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111 /// storage services without any extra copy or intensive memory allocations.
112 pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113 let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114 Ok(bufs.into_iter().flatten().collect())
115 }
116
117 /// Read all data from reader into given [`BufMut`].
118 ///
119 /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120 /// [`BufMut`] doesn't have enough space.
121 pub async fn read_into(
122 &self,
123 buf: &mut impl BufMut,
124 range: impl RangeBounds<u64>,
125 ) -> Result<usize> {
126 let mut stream = self.clone().into_stream(range).await?;
127
128 let mut read = 0;
129 loop {
130 let Some(bs) = stream.try_next().await? else {
131 return Ok(read);
132 };
133 read += bs.len();
134 buf.put(bs);
135 }
136 }
137
138 /// Fetch specific ranges from reader.
139 ///
140 /// This operation try to merge given ranges into a list of
141 /// non-overlapping ranges. Users may also specify a `gap` to merge
142 /// close ranges.
143 ///
144 /// The returning `Buffer` may share the same underlying memory without
145 /// any extra copy.
146 pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147 let merged_ranges = self.merge_ranges(ranges.clone());
148
149 #[derive(Clone)]
150 struct FetchInput {
151 reader: Reader,
152 range: Range<u64>,
153 }
154
155 let mut tasks = ConcurrentTasks::new(
156 self.ctx.accessor().info().executor(),
157 self.ctx.options().concurrent(),
158 |input: FetchInput| {
159 Box::pin(async move {
160 let FetchInput { range, reader } = input.clone();
161 (input, reader.read(range).await)
162 })
163 },
164 );
165
166 for range in merged_ranges.clone() {
167 let reader = self.clone();
168 tasks.execute(FetchInput { reader, range }).await?;
169 }
170
171 let mut merged_bufs = vec![];
172 while let Some(b) = tasks.next().await {
173 merged_bufs.push(b?);
174 }
175
176 let mut bufs = Vec::with_capacity(ranges.len());
177 for range in ranges {
178 let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
179 let start = range.start - merged_ranges[idx].start;
180 let end = range.end - merged_ranges[idx].start;
181 bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
182 }
183
184 Ok(bufs)
185 }
186
187 /// Merge given ranges into a list of non-overlapping ranges.
188 fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
189 let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
190 // We don't care about the order of range with same start, they
191 // will be merged in the next step.
192 ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
193
194 // We know that this vector will have at most element
195 let mut merged = Vec::with_capacity(ranges.len());
196 let mut cur = ranges[0].clone();
197
198 for range in ranges.into_iter().skip(1) {
199 if range.start <= cur.end + gap {
200 // There is an overlap or the gap is small enough to merge
201 cur.end = cur.end.max(range.end);
202 } else {
203 // No overlap and the gap is too large, push the current range to the list and start a new one
204 merged.push(cur);
205 cur = range;
206 }
207 }
208
209 // Push the last range
210 merged.push(cur);
211
212 merged
213 }
214
215 /// Create a buffer stream to read specific range from given reader.
216 ///
217 /// # Notes
218 ///
219 /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
220 /// It will return underlying [`Buffer`] directly.
221 ///
222 /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
223 ///
224 /// # Inputs
225 ///
226 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
227 ///
228 /// # Examples
229 ///
230 /// ## Basic Usage
231 ///
232 /// ```
233 /// use std::io;
234 ///
235 /// use bytes::Bytes;
236 /// use futures::TryStreamExt;
237 /// use opendal::Buffer;
238 /// use opendal::Operator;
239 /// use opendal::Result;
240 ///
241 /// async fn test(op: Operator) -> io::Result<()> {
242 /// let mut s = op
243 /// .reader("hello.txt")
244 /// .await?
245 /// .into_stream(1024..2048)
246 /// .await?;
247 ///
248 /// let bs: Vec<Buffer> = s.try_collect().await?;
249 /// // We can use those buffer as bytes if we want.
250 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
251 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
252 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
253 ///
254 /// Ok(())
255 /// }
256 /// ```
257 ///
258 /// ## Concurrent Read
259 ///
260 /// The following example reads data in 256B chunks with 8 concurrent.
261 ///
262 /// ```
263 /// use std::io;
264 ///
265 /// use bytes::Bytes;
266 /// use futures::TryStreamExt;
267 /// use opendal::Buffer;
268 /// use opendal::Operator;
269 /// use opendal::Result;
270 ///
271 /// async fn test(op: Operator) -> io::Result<()> {
272 /// let s = op
273 /// .reader_with("hello.txt")
274 /// .concurrent(8)
275 /// .chunk(256)
276 /// .await?
277 /// .into_stream(1024..2048)
278 /// .await?;
279 ///
280 /// // Every buffer except the last one in the stream will be 256B.
281 /// let bs: Vec<Buffer> = s.try_collect().await?;
282 /// // We can use those buffer as bytes if we want.
283 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
284 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
285 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
286 ///
287 /// Ok(())
288 /// }
289 /// ```
290 pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
291 BufferStream::create(self.ctx, range).await
292 }
293
294 /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
295 /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
296 ///
297 /// # Notes
298 ///
299 /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
300 /// returns an owned [`Buffer`], which involves an extra copy operation.
301 ///
302 /// # Inputs
303 ///
304 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
305 ///
306 /// # Examples
307 ///
308 /// ## Basic Usage
309 ///
310 /// ```
311 /// use std::io;
312 ///
313 /// use futures::io::AsyncReadExt;
314 /// use opendal::Operator;
315 /// use opendal::Result;
316 ///
317 /// async fn test(op: Operator) -> io::Result<()> {
318 /// let mut r = op
319 /// .reader("hello.txt")
320 /// .await?
321 /// .into_futures_async_read(1024..2048)
322 /// .await?;
323 /// let mut bs = Vec::new();
324 /// r.read_to_end(&mut bs).await?;
325 ///
326 /// Ok(())
327 /// }
328 /// ```
329 ///
330 /// ## Concurrent Read
331 ///
332 /// The following example reads data in 256B chunks with 8 concurrent.
333 ///
334 /// ```
335 /// use std::io;
336 ///
337 /// use futures::io::AsyncReadExt;
338 /// use opendal::Operator;
339 /// use opendal::Result;
340 ///
341 /// async fn test(op: Operator) -> io::Result<()> {
342 /// let mut r = op
343 /// .reader_with("hello.txt")
344 /// .concurrent(8)
345 /// .chunk(256)
346 /// .await?
347 /// .into_futures_async_read(1024..2048)
348 /// .await?;
349 /// let mut bs = Vec::new();
350 /// r.read_to_end(&mut bs).await?;
351 ///
352 /// Ok(())
353 /// }
354 /// ```
355 #[inline]
356 pub async fn into_futures_async_read(
357 self,
358 range: impl RangeBounds<u64>,
359 ) -> Result<FuturesAsyncReader> {
360 let range = self.ctx.parse_into_range(range).await?;
361 Ok(FuturesAsyncReader::new(self.ctx, range))
362 }
363
364 /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
365 ///
366 /// # Inputs
367 ///
368 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
369 ///
370 /// # Examples
371 ///
372 /// ## Basic Usage
373 ///
374 /// ```
375 /// use std::io;
376 ///
377 /// use bytes::Bytes;
378 /// use futures::TryStreamExt;
379 /// use opendal::Operator;
380 /// use opendal::Result;
381 ///
382 /// async fn test(op: Operator) -> io::Result<()> {
383 /// let mut s = op
384 /// .reader("hello.txt")
385 /// .await?
386 /// .into_bytes_stream(1024..2048)
387 /// .await?;
388 /// let bs: Vec<Bytes> = s.try_collect().await?;
389 ///
390 /// Ok(())
391 /// }
392 /// ```
393 ///
394 /// ## Concurrent Read
395 ///
396 /// The following example reads data in 256B chunks with 8 concurrent.
397 ///
398 /// ```
399 /// use std::io;
400 ///
401 /// use bytes::Bytes;
402 /// use futures::TryStreamExt;
403 /// use opendal::Operator;
404 /// use opendal::Result;
405 ///
406 /// async fn test(op: Operator) -> io::Result<()> {
407 /// let mut s = op
408 /// .reader_with("hello.txt")
409 /// .concurrent(8)
410 /// .chunk(256)
411 /// .await?
412 /// .into_bytes_stream(1024..2048)
413 /// .await?;
414 /// let bs: Vec<Bytes> = s.try_collect().await?;
415 ///
416 /// Ok(())
417 /// }
418 /// ```
419 #[inline]
420 pub async fn into_bytes_stream(
421 self,
422 range: impl RangeBounds<u64>,
423 ) -> Result<FuturesBytesStream> {
424 FuturesBytesStream::new(self.ctx, range).await
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use bytes::Bytes;
431 use rand::rngs::ThreadRng;
432 use rand::Rng;
433 use rand::RngCore;
434
435 use super::*;
436 use crate::raw::*;
437 use crate::services;
438 use crate::Operator;
439
440 #[tokio::test]
441 async fn test_trait() -> Result<()> {
442 let op = Operator::via_iter(Scheme::Memory, [])?;
443 op.write(
444 "test",
445 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
446 )
447 .await?;
448
449 let acc = op.into_inner();
450 let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
451
452 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
453
454 Ok(())
455 }
456
457 fn gen_random_bytes() -> Vec<u8> {
458 let mut rng = ThreadRng::default();
459 // Generate size between 1B..16MB.
460 let size = rng.gen_range(1..16 * 1024 * 1024);
461 let mut content = vec![0; size];
462 rng.fill_bytes(&mut content);
463 content
464 }
465
466 fn gen_fixed_bytes(size: usize) -> Vec<u8> {
467 let mut rng = ThreadRng::default();
468 let mut content = vec![0; size];
469 rng.fill_bytes(&mut content);
470 content
471 }
472
473 #[tokio::test]
474 async fn test_reader_read() -> Result<()> {
475 let op = Operator::via_iter(Scheme::Memory, [])?;
476 let path = "test_file";
477
478 let content = gen_random_bytes();
479 op.write(path, content.clone())
480 .await
481 .expect("write must succeed");
482
483 let reader = op.reader(path).await.unwrap();
484 let buf = reader.read(..).await.expect("read to end must succeed");
485
486 assert_eq!(buf.to_bytes(), content);
487 Ok(())
488 }
489
490 #[tokio::test]
491 async fn test_reader_read_with_chunk() -> Result<()> {
492 let op = Operator::via_iter(Scheme::Memory, [])?;
493 let path = "test_file";
494
495 let content = gen_random_bytes();
496 op.write(path, content.clone())
497 .await
498 .expect("write must succeed");
499
500 let reader = op.reader_with(path).chunk(16).await.unwrap();
501 let buf = reader.read(..).await.expect("read to end must succeed");
502
503 assert_eq!(buf.to_bytes(), content);
504 Ok(())
505 }
506
507 #[tokio::test]
508 async fn test_reader_read_with_concurrent() -> Result<()> {
509 let op = Operator::via_iter(Scheme::Memory, [])?;
510 let path = "test_file";
511
512 let content = gen_random_bytes();
513 op.write(path, content.clone())
514 .await
515 .expect("write must succeed");
516
517 let reader = op
518 .reader_with(path)
519 .chunk(128)
520 .concurrent(16)
521 .await
522 .unwrap();
523 let buf = reader.read(..).await.expect("read to end must succeed");
524
525 assert_eq!(buf.to_bytes(), content);
526 Ok(())
527 }
528
529 #[tokio::test]
530 async fn test_reader_read_into() -> Result<()> {
531 let op = Operator::via_iter(Scheme::Memory, [])?;
532 let path = "test_file";
533
534 let content = gen_random_bytes();
535 op.write(path, content.clone())
536 .await
537 .expect("write must succeed");
538
539 let reader = op.reader(path).await.unwrap();
540 let mut buf = Vec::new();
541 reader
542 .read_into(&mut buf, ..)
543 .await
544 .expect("read to end must succeed");
545
546 assert_eq!(buf, content);
547 Ok(())
548 }
549
550 #[tokio::test]
551 async fn test_merge_ranges() -> Result<()> {
552 let op = Operator::new(services::Memory::default()).unwrap().finish();
553 let path = "test_file";
554
555 let content = gen_random_bytes();
556 op.write(path, content.clone())
557 .await
558 .expect("write must succeed");
559
560 let reader = op.reader_with(path).gap(1).await.unwrap();
561
562 let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
563 let merged = reader.merge_ranges(ranges.clone());
564 assert_eq!(merged, vec![0..30, 40..60]);
565 Ok(())
566 }
567
568 #[tokio::test]
569 async fn test_fetch() -> Result<()> {
570 let op = Operator::new(services::Memory::default()).unwrap().finish();
571 let path = "test_file";
572
573 let content = gen_fixed_bytes(1024);
574 op.write(path, content.clone())
575 .await
576 .expect("write must succeed");
577
578 let reader = op.reader_with(path).gap(1).await.unwrap();
579
580 let ranges = vec![
581 0..10,
582 40..50,
583 45..59,
584 10..20,
585 21..30,
586 40..50,
587 40..60,
588 45..59,
589 ];
590 let merged = reader
591 .fetch(ranges.clone())
592 .await
593 .expect("fetch must succeed");
594
595 for (i, range) in ranges.iter().enumerate() {
596 assert_eq!(
597 merged[i].to_bytes(),
598 content[range.start as usize..range.end as usize]
599 );
600 }
601 Ok(())
602 }
603}