opendal/types/read/reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::stream;
24use futures::StreamExt;
25use futures::TryStreamExt;
26
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47/// let r = op.reader("path/to/file").await?;
48/// let bs = r.read(0..1024).await?;
49/// Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62/// let s = op
63/// .reader("path/to/file")
64/// .await?
65/// .into_bytes_stream(1024..2048)
66/// .await?;
67/// let bs: Vec<Bytes> = s.try_collect().await?;
68/// Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81/// let mut r = op
82/// .reader("path/to/file")
83/// .await?
84/// .into_futures_async_read(1024..2048)
85/// .await?;
86/// let mut bs = vec![];
87/// let n = r.read_to_end(&mut bs).await?;
88/// Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93 ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97 /// Create a new reader.
98 ///
99 /// Create will use internal information to decide the most suitable
100 /// implementation for users.
101 ///
102 /// We don't want to expose those details to users so keep this function
103 /// in crate only.
104 pub(crate) fn new(ctx: ReadContext) -> Self {
105 Reader { ctx: Arc::new(ctx) }
106 }
107
108 /// Read give range from reader into [`Buffer`].
109 ///
110 /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111 /// storage services without any extra copy or intensive memory allocations.
112 pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113 let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114 Ok(bufs.into_iter().flatten().collect())
115 }
116
117 /// Read all data from reader into given [`BufMut`].
118 ///
119 /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120 /// [`BufMut`] doesn't have enough space.
121 pub async fn read_into(
122 &self,
123 buf: &mut impl BufMut,
124 range: impl RangeBounds<u64>,
125 ) -> Result<usize> {
126 let mut stream = self.clone().into_stream(range).await?;
127
128 let mut read = 0;
129 loop {
130 let Some(bs) = stream.try_next().await? else {
131 return Ok(read);
132 };
133 read += bs.len();
134 buf.put(bs);
135 }
136 }
137
138 /// Fetch specific ranges from reader.
139 ///
140 /// This operation try to merge given ranges into a list of
141 /// non-overlapping ranges. Users may also specify a `gap` to merge
142 /// close ranges.
143 ///
144 /// The returning `Buffer` may share the same underlying memory without
145 /// any extra copy.
146 pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147 let merged_ranges = self.merge_ranges(ranges.clone());
148
149 let merged_bufs: Vec<_> =
150 stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v)))
151 .buffered(self.ctx.options().concurrent())
152 .try_collect()
153 .await?;
154
155 let mut bufs = Vec::with_capacity(ranges.len());
156 for range in ranges {
157 let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
158 let start = range.start - merged_ranges[idx].start;
159 let end = range.end - merged_ranges[idx].start;
160 bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
161 }
162
163 Ok(bufs)
164 }
165
166 /// Merge given ranges into a list of non-overlapping ranges.
167 fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
168 let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
169 // We don't care about the order of range with same start, they
170 // will be merged in the next step.
171 ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
172
173 // We know that this vector will have at most element
174 let mut merged = Vec::with_capacity(ranges.len());
175 let mut cur = ranges[0].clone();
176
177 for range in ranges.into_iter().skip(1) {
178 if range.start <= cur.end + gap {
179 // There is an overlap or the gap is small enough to merge
180 cur.end = cur.end.max(range.end);
181 } else {
182 // No overlap and the gap is too large, push the current range to the list and start a new one
183 merged.push(cur);
184 cur = range;
185 }
186 }
187
188 // Push the last range
189 merged.push(cur);
190
191 merged
192 }
193
194 /// Create a buffer stream to read specific range from given reader.
195 ///
196 /// # Notes
197 ///
198 /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
199 /// It will return underlying [`Buffer`] directly.
200 ///
201 /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
202 ///
203 /// # Inputs
204 ///
205 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
206 ///
207 /// # Examples
208 ///
209 /// ## Basic Usage
210 ///
211 /// ```
212 /// use std::io;
213 ///
214 /// use futures::TryStreamExt;
215 /// use opendal::{Buffer, Operator};
216 /// use opendal::Result;
217 /// use bytes::Bytes;
218 ///
219 /// async fn test(op: Operator) -> io::Result<()> {
220 /// let mut s = op
221 /// .reader("hello.txt")
222 /// .await?
223 /// .into_stream(1024..2048)
224 /// .await?;
225 ///
226 /// let bs: Vec<Buffer> = s.try_collect().await?;
227 /// // We can use those buffer as bytes if we want.
228 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
229 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
230 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
231 ///
232 /// Ok(())
233 /// }
234 /// ```
235 ///
236 /// ## Concurrent Read
237 ///
238 /// The following example reads data in 256B chunks with 8 concurrent.
239 ///
240 /// ```
241 /// use std::io;
242 /// use bytes::Bytes;
243 ///
244 /// use futures::TryStreamExt;
245 /// use opendal::{Buffer, Operator};
246 /// use opendal::Result;
247 ///
248 /// async fn test(op: Operator) -> io::Result<()> {
249 /// let s = op
250 /// .reader_with("hello.txt")
251 /// .concurrent(8)
252 /// .chunk(256)
253 /// .await?
254 /// .into_stream(1024..2048)
255 /// .await?;
256 ///
257 /// // Every buffer except the last one in the stream will be 256B.
258 /// let bs: Vec<Buffer> = s.try_collect().await?;
259 /// // We can use those buffer as bytes if we want.
260 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
261 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
262 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
263 ///
264 /// Ok(())
265 /// }
266 /// ```
267 pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
268 BufferStream::create(self.ctx, range).await
269 }
270
271 /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
272 /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
273 ///
274 /// # Notes
275 ///
276 /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
277 /// returns an owned [`Buffer`], which involves an extra copy operation.
278 ///
279 /// # Inputs
280 ///
281 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
282 ///
283 /// # Examples
284 ///
285 /// ## Basic Usage
286 ///
287 /// ```
288 /// use std::io;
289 ///
290 /// use futures::io::AsyncReadExt;
291 /// use opendal::Operator;
292 /// use opendal::Result;
293 ///
294 /// async fn test(op: Operator) -> io::Result<()> {
295 /// let mut r = op
296 /// .reader("hello.txt")
297 /// .await?
298 /// .into_futures_async_read(1024..2048)
299 /// .await?;
300 /// let mut bs = Vec::new();
301 /// r.read_to_end(&mut bs).await?;
302 ///
303 /// Ok(())
304 /// }
305 /// ```
306 ///
307 /// ## Concurrent Read
308 ///
309 /// The following example reads data in 256B chunks with 8 concurrent.
310 ///
311 /// ```
312 /// use std::io;
313 ///
314 /// use futures::io::AsyncReadExt;
315 /// use opendal::Operator;
316 /// use opendal::Result;
317 ///
318 /// async fn test(op: Operator) -> io::Result<()> {
319 /// let mut r = op
320 /// .reader_with("hello.txt")
321 /// .concurrent(8)
322 /// .chunk(256)
323 /// .await?
324 /// .into_futures_async_read(1024..2048)
325 /// .await?;
326 /// let mut bs = Vec::new();
327 /// r.read_to_end(&mut bs).await?;
328 ///
329 /// Ok(())
330 /// }
331 /// ```
332 #[inline]
333 pub async fn into_futures_async_read(
334 self,
335 range: impl RangeBounds<u64>,
336 ) -> Result<FuturesAsyncReader> {
337 let range = self.ctx.parse_into_range(range).await?;
338 Ok(FuturesAsyncReader::new(self.ctx, range))
339 }
340
341 /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
342 ///
343 /// # Inputs
344 ///
345 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
346 ///
347 /// # Examples
348 ///
349 /// ## Basic Usage
350 ///
351 /// ```
352 /// use std::io;
353 ///
354 /// use bytes::Bytes;
355 /// use futures::TryStreamExt;
356 /// use opendal::Operator;
357 /// use opendal::Result;
358 ///
359 /// async fn test(op: Operator) -> io::Result<()> {
360 /// let mut s = op
361 /// .reader("hello.txt")
362 /// .await?
363 /// .into_bytes_stream(1024..2048)
364 /// .await?;
365 /// let bs: Vec<Bytes> = s.try_collect().await?;
366 ///
367 /// Ok(())
368 /// }
369 /// ```
370 ///
371 /// ## Concurrent Read
372 ///
373 /// The following example reads data in 256B chunks with 8 concurrent.
374 ///
375 /// ```
376 /// use std::io;
377 ///
378 /// use bytes::Bytes;
379 /// use futures::TryStreamExt;
380 /// use opendal::Operator;
381 /// use opendal::Result;
382 ///
383 /// async fn test(op: Operator) -> io::Result<()> {
384 /// let mut s = op
385 /// .reader_with("hello.txt")
386 /// .concurrent(8)
387 /// .chunk(256)
388 /// .await?
389 /// .into_bytes_stream(1024..2048)
390 /// .await?;
391 /// let bs: Vec<Bytes> = s.try_collect().await?;
392 ///
393 /// Ok(())
394 /// }
395 /// ```
396 #[inline]
397 pub async fn into_bytes_stream(
398 self,
399 range: impl RangeBounds<u64>,
400 ) -> Result<FuturesBytesStream> {
401 FuturesBytesStream::new(self.ctx, range).await
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use bytes::Bytes;
408 use rand::rngs::ThreadRng;
409 use rand::Rng;
410 use rand::RngCore;
411
412 use super::*;
413 use crate::raw::*;
414 use crate::services;
415 use crate::Operator;
416
417 #[tokio::test]
418 async fn test_trait() -> Result<()> {
419 let op = Operator::via_iter(Scheme::Memory, [])?;
420 op.write(
421 "test",
422 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
423 )
424 .await?;
425
426 let acc = op.into_inner();
427 let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
428
429 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
430
431 Ok(())
432 }
433
434 fn gen_random_bytes() -> Vec<u8> {
435 let mut rng = ThreadRng::default();
436 // Generate size between 1B..16MB.
437 let size = rng.gen_range(1..16 * 1024 * 1024);
438 let mut content = vec![0; size];
439 rng.fill_bytes(&mut content);
440 content
441 }
442
443 fn gen_fixed_bytes(size: usize) -> Vec<u8> {
444 let mut rng = ThreadRng::default();
445 let mut content = vec![0; size];
446 rng.fill_bytes(&mut content);
447 content
448 }
449
450 #[tokio::test]
451 async fn test_reader_read() -> Result<()> {
452 let op = Operator::via_iter(Scheme::Memory, [])?;
453 let path = "test_file";
454
455 let content = gen_random_bytes();
456 op.write(path, content.clone())
457 .await
458 .expect("write must succeed");
459
460 let reader = op.reader(path).await.unwrap();
461 let buf = reader.read(..).await.expect("read to end must succeed");
462
463 assert_eq!(buf.to_bytes(), content);
464 Ok(())
465 }
466
467 #[tokio::test]
468 async fn test_reader_read_with_chunk() -> Result<()> {
469 let op = Operator::via_iter(Scheme::Memory, [])?;
470 let path = "test_file";
471
472 let content = gen_random_bytes();
473 op.write(path, content.clone())
474 .await
475 .expect("write must succeed");
476
477 let reader = op.reader_with(path).chunk(16).await.unwrap();
478 let buf = reader.read(..).await.expect("read to end must succeed");
479
480 assert_eq!(buf.to_bytes(), content);
481 Ok(())
482 }
483
484 #[tokio::test]
485 async fn test_reader_read_with_concurrent() -> Result<()> {
486 let op = Operator::via_iter(Scheme::Memory, [])?;
487 let path = "test_file";
488
489 let content = gen_random_bytes();
490 op.write(path, content.clone())
491 .await
492 .expect("write must succeed");
493
494 let reader = op
495 .reader_with(path)
496 .chunk(128)
497 .concurrent(16)
498 .await
499 .unwrap();
500 let buf = reader.read(..).await.expect("read to end must succeed");
501
502 assert_eq!(buf.to_bytes(), content);
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_reader_read_into() -> Result<()> {
508 let op = Operator::via_iter(Scheme::Memory, [])?;
509 let path = "test_file";
510
511 let content = gen_random_bytes();
512 op.write(path, content.clone())
513 .await
514 .expect("write must succeed");
515
516 let reader = op.reader(path).await.unwrap();
517 let mut buf = Vec::new();
518 reader
519 .read_into(&mut buf, ..)
520 .await
521 .expect("read to end must succeed");
522
523 assert_eq!(buf, content);
524 Ok(())
525 }
526
527 #[tokio::test]
528 async fn test_merge_ranges() -> Result<()> {
529 let op = Operator::new(services::Memory::default()).unwrap().finish();
530 let path = "test_file";
531
532 let content = gen_random_bytes();
533 op.write(path, content.clone())
534 .await
535 .expect("write must succeed");
536
537 let reader = op.reader_with(path).gap(1).await.unwrap();
538
539 let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
540 let merged = reader.merge_ranges(ranges.clone());
541 assert_eq!(merged, vec![0..30, 40..60]);
542 Ok(())
543 }
544
545 #[tokio::test]
546 async fn test_fetch() -> Result<()> {
547 let op = Operator::new(services::Memory::default()).unwrap().finish();
548 let path = "test_file";
549
550 let content = gen_fixed_bytes(1024);
551 op.write(path, content.clone())
552 .await
553 .expect("write must succeed");
554
555 let reader = op.reader_with(path).gap(1).await.unwrap();
556
557 let ranges = vec![
558 0..10,
559 40..50,
560 45..59,
561 10..20,
562 21..30,
563 40..50,
564 40..60,
565 45..59,
566 ];
567 let merged = reader
568 .fetch(ranges.clone())
569 .await
570 .expect("fetch must succeed");
571
572 for (i, range) in ranges.iter().enumerate() {
573 assert_eq!(
574 merged[i].to_bytes(),
575 content[range.start as usize..range.end as usize]
576 );
577 }
578 Ok(())
579 }
580}