opendal/types/read/reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Range;
19use std::ops::RangeBounds;
20use std::sync::Arc;
21
22use bytes::BufMut;
23use futures::stream;
24use futures::StreamExt;
25use futures::TryStreamExt;
26
27use crate::*;
28
29/// Reader is designed to read data from given path in an asynchronous
30/// manner.
31///
32/// # Usage
33///
34/// [`Reader`] provides multiple ways to read data from given reader.
35///
36/// `Reader` implements `Clone` so you can clone it and store in place where ever you want.
37///
38/// ## Direct
39///
40/// [`Reader`] provides public API including [`Reader::read`]. You can use those APIs directly without extra copy.
41///
42/// ```
43/// use opendal::Operator;
44/// use opendal::Result;
45///
46/// async fn test(op: Operator) -> Result<()> {
47/// let r = op.reader("path/to/file").await?;
48/// let bs = r.read(0..1024).await?;
49/// Ok(())
50/// }
51/// ```
52///
53/// ## Read like `Stream`
54///
55/// ```
56/// use anyhow::Result;
57/// use bytes::Bytes;
58/// use futures::TryStreamExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62/// let s = op
63/// .reader("path/to/file")
64/// .await?
65/// .into_bytes_stream(1024..2048)
66/// .await?;
67/// let bs: Vec<Bytes> = s.try_collect().await?;
68/// Ok(())
69/// }
70/// ```
71///
72/// ## Read like `AsyncRead` and `AsyncBufRead`
73///
74/// ```
75/// use anyhow::Result;
76/// use bytes::Bytes;
77/// use futures::AsyncReadExt;
78/// use opendal::Operator;
79///
80/// async fn test(op: Operator) -> Result<()> {
81/// let mut r = op
82/// .reader("path/to/file")
83/// .await?
84/// .into_futures_async_read(1024..2048)
85/// .await?;
86/// let mut bs = vec![];
87/// let n = r.read_to_end(&mut bs).await?;
88/// Ok(())
89/// }
90/// ```
91#[derive(Clone)]
92pub struct Reader {
93 ctx: Arc<ReadContext>,
94}
95
96impl Reader {
97 /// Create a new reader.
98 ///
99 /// Create will use internal information to decide the most suitable
100 /// implementation for users.
101 ///
102 /// We don't want to expose those details to users so keep this function
103 /// in crate only.
104 pub(crate) fn new(ctx: ReadContext) -> Self {
105 Reader { ctx: Arc::new(ctx) }
106 }
107
108 /// Read give range from reader into [`Buffer`].
109 ///
110 /// This operation is zero-copy, which means it keeps the [`bytes::Bytes`] returned by underlying
111 /// storage services without any extra copy or intensive memory allocations.
112 pub async fn read(&self, range: impl RangeBounds<u64>) -> Result<Buffer> {
113 let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?;
114 Ok(bufs.into_iter().flatten().collect())
115 }
116
117 /// Read all data from reader into given [`BufMut`].
118 ///
119 /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while
120 /// [`BufMut`] doesn't have enough space.
121 pub async fn read_into(
122 &self,
123 buf: &mut impl BufMut,
124 range: impl RangeBounds<u64>,
125 ) -> Result<usize> {
126 let mut stream = self.clone().into_stream(range).await?;
127
128 let mut read = 0;
129 loop {
130 let Some(bs) = stream.try_next().await? else {
131 return Ok(read);
132 };
133 read += bs.len();
134 buf.put(bs);
135 }
136 }
137
138 /// Fetch specific ranges from reader.
139 ///
140 /// This operation try to merge given ranges into a list of
141 /// non-overlapping ranges. Users may also specify a `gap` to merge
142 /// close ranges.
143 ///
144 /// The returning `Buffer` may share the same underlying memory without
145 /// any extra copy.
146 pub async fn fetch(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Buffer>> {
147 let merged_ranges = self.merge_ranges(ranges.clone());
148
149 let merged_bufs: Vec<_> =
150 stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v)))
151 .buffered(self.ctx.options().concurrent())
152 .try_collect()
153 .await?;
154
155 let mut bufs = Vec::with_capacity(ranges.len());
156 for range in ranges {
157 let idx = merged_ranges.partition_point(|v| v.start <= range.start) - 1;
158 let start = range.start - merged_ranges[idx].start;
159 let end = range.end - merged_ranges[idx].start;
160 bufs.push(merged_bufs[idx].slice(start as usize..end as usize));
161 }
162
163 Ok(bufs)
164 }
165
166 /// Merge given ranges into a list of non-overlapping ranges.
167 fn merge_ranges(&self, mut ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
168 let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
169 // We don't care about the order of range with same start, they
170 // will be merged in the next step.
171 ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
172
173 // We know that this vector will have at most element
174 let mut merged = Vec::with_capacity(ranges.len());
175 let mut cur = ranges[0].clone();
176
177 for range in ranges.into_iter().skip(1) {
178 if range.start <= cur.end + gap {
179 // There is an overlap or the gap is small enough to merge
180 cur.end = cur.end.max(range.end);
181 } else {
182 // No overlap and the gap is too large, push the current range to the list and start a new one
183 merged.push(cur);
184 cur = range;
185 }
186 }
187
188 // Push the last range
189 merged.push(cur);
190
191 merged
192 }
193
194 /// Create a buffer stream to read specific range from given reader.
195 ///
196 /// # Notes
197 ///
198 /// BufferStream is a zero-cost abstraction. It doesn't involve extra copy of data.
199 /// It will return underlying [`Buffer`] directly.
200 ///
201 /// The [`Buffer`] this stream yields can be seen as an iterator of [`Bytes`].
202 ///
203 /// # Inputs
204 ///
205 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
206 ///
207 /// # Examples
208 ///
209 /// ## Basic Usage
210 ///
211 /// ```
212 /// use std::io;
213 ///
214 /// use bytes::Bytes;
215 /// use futures::TryStreamExt;
216 /// use opendal::Buffer;
217 /// use opendal::Operator;
218 /// use opendal::Result;
219 ///
220 /// async fn test(op: Operator) -> io::Result<()> {
221 /// let mut s = op
222 /// .reader("hello.txt")
223 /// .await?
224 /// .into_stream(1024..2048)
225 /// .await?;
226 ///
227 /// let bs: Vec<Buffer> = s.try_collect().await?;
228 /// // We can use those buffer as bytes if we want.
229 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
230 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
231 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
232 ///
233 /// Ok(())
234 /// }
235 /// ```
236 ///
237 /// ## Concurrent Read
238 ///
239 /// The following example reads data in 256B chunks with 8 concurrent.
240 ///
241 /// ```
242 /// use std::io;
243 ///
244 /// use bytes::Bytes;
245 /// use futures::TryStreamExt;
246 /// use opendal::Buffer;
247 /// use opendal::Operator;
248 /// use opendal::Result;
249 ///
250 /// async fn test(op: Operator) -> io::Result<()> {
251 /// let s = op
252 /// .reader_with("hello.txt")
253 /// .concurrent(8)
254 /// .chunk(256)
255 /// .await?
256 /// .into_stream(1024..2048)
257 /// .await?;
258 ///
259 /// // Every buffer except the last one in the stream will be 256B.
260 /// let bs: Vec<Buffer> = s.try_collect().await?;
261 /// // We can use those buffer as bytes if we want.
262 /// let bytes_vec: Vec<Bytes> = bs.clone().into_iter().flatten().collect();
263 /// // Or we can merge them into a single [`Buffer`] and later use it as [`bytes::Buf`].
264 /// let new_buffer: Buffer = bs.into_iter().flatten().collect::<Buffer>();
265 ///
266 /// Ok(())
267 /// }
268 /// ```
269 pub async fn into_stream(self, range: impl RangeBounds<u64>) -> Result<BufferStream> {
270 BufferStream::create(self.ctx, range).await
271 }
272
273 /// Convert reader into [`FuturesAsyncReader`] which implements [`futures::AsyncRead`],
274 /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
275 ///
276 /// # Notes
277 ///
278 /// FuturesAsyncReader is not a zero-cost abstraction. The underlying reader
279 /// returns an owned [`Buffer`], which involves an extra copy operation.
280 ///
281 /// # Inputs
282 ///
283 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
284 ///
285 /// # Examples
286 ///
287 /// ## Basic Usage
288 ///
289 /// ```
290 /// use std::io;
291 ///
292 /// use futures::io::AsyncReadExt;
293 /// use opendal::Operator;
294 /// use opendal::Result;
295 ///
296 /// async fn test(op: Operator) -> io::Result<()> {
297 /// let mut r = op
298 /// .reader("hello.txt")
299 /// .await?
300 /// .into_futures_async_read(1024..2048)
301 /// .await?;
302 /// let mut bs = Vec::new();
303 /// r.read_to_end(&mut bs).await?;
304 ///
305 /// Ok(())
306 /// }
307 /// ```
308 ///
309 /// ## Concurrent Read
310 ///
311 /// The following example reads data in 256B chunks with 8 concurrent.
312 ///
313 /// ```
314 /// use std::io;
315 ///
316 /// use futures::io::AsyncReadExt;
317 /// use opendal::Operator;
318 /// use opendal::Result;
319 ///
320 /// async fn test(op: Operator) -> io::Result<()> {
321 /// let mut r = op
322 /// .reader_with("hello.txt")
323 /// .concurrent(8)
324 /// .chunk(256)
325 /// .await?
326 /// .into_futures_async_read(1024..2048)
327 /// .await?;
328 /// let mut bs = Vec::new();
329 /// r.read_to_end(&mut bs).await?;
330 ///
331 /// Ok(())
332 /// }
333 /// ```
334 #[inline]
335 pub async fn into_futures_async_read(
336 self,
337 range: impl RangeBounds<u64>,
338 ) -> Result<FuturesAsyncReader> {
339 let range = self.ctx.parse_into_range(range).await?;
340 Ok(FuturesAsyncReader::new(self.ctx, range))
341 }
342
343 /// Convert reader into [`FuturesBytesStream`] which implements [`futures::Stream`].
344 ///
345 /// # Inputs
346 ///
347 /// - `range`: The range of data to read. range like `..` it will read all data from reader.
348 ///
349 /// # Examples
350 ///
351 /// ## Basic Usage
352 ///
353 /// ```
354 /// use std::io;
355 ///
356 /// use bytes::Bytes;
357 /// use futures::TryStreamExt;
358 /// use opendal::Operator;
359 /// use opendal::Result;
360 ///
361 /// async fn test(op: Operator) -> io::Result<()> {
362 /// let mut s = op
363 /// .reader("hello.txt")
364 /// .await?
365 /// .into_bytes_stream(1024..2048)
366 /// .await?;
367 /// let bs: Vec<Bytes> = s.try_collect().await?;
368 ///
369 /// Ok(())
370 /// }
371 /// ```
372 ///
373 /// ## Concurrent Read
374 ///
375 /// The following example reads data in 256B chunks with 8 concurrent.
376 ///
377 /// ```
378 /// use std::io;
379 ///
380 /// use bytes::Bytes;
381 /// use futures::TryStreamExt;
382 /// use opendal::Operator;
383 /// use opendal::Result;
384 ///
385 /// async fn test(op: Operator) -> io::Result<()> {
386 /// let mut s = op
387 /// .reader_with("hello.txt")
388 /// .concurrent(8)
389 /// .chunk(256)
390 /// .await?
391 /// .into_bytes_stream(1024..2048)
392 /// .await?;
393 /// let bs: Vec<Bytes> = s.try_collect().await?;
394 ///
395 /// Ok(())
396 /// }
397 /// ```
398 #[inline]
399 pub async fn into_bytes_stream(
400 self,
401 range: impl RangeBounds<u64>,
402 ) -> Result<FuturesBytesStream> {
403 FuturesBytesStream::new(self.ctx, range).await
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use bytes::Bytes;
410 use rand::rngs::ThreadRng;
411 use rand::Rng;
412 use rand::RngCore;
413
414 use super::*;
415 use crate::raw::*;
416 use crate::services;
417 use crate::Operator;
418
419 #[tokio::test]
420 async fn test_trait() -> Result<()> {
421 let op = Operator::via_iter(Scheme::Memory, [])?;
422 op.write(
423 "test",
424 Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
425 )
426 .await?;
427
428 let acc = op.into_inner();
429 let ctx = ReadContext::new(acc, "test".to_string(), OpRead::new(), OpReader::new());
430
431 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(Reader::new(ctx));
432
433 Ok(())
434 }
435
436 fn gen_random_bytes() -> Vec<u8> {
437 let mut rng = ThreadRng::default();
438 // Generate size between 1B..16MB.
439 let size = rng.gen_range(1..16 * 1024 * 1024);
440 let mut content = vec![0; size];
441 rng.fill_bytes(&mut content);
442 content
443 }
444
445 fn gen_fixed_bytes(size: usize) -> Vec<u8> {
446 let mut rng = ThreadRng::default();
447 let mut content = vec![0; size];
448 rng.fill_bytes(&mut content);
449 content
450 }
451
452 #[tokio::test]
453 async fn test_reader_read() -> Result<()> {
454 let op = Operator::via_iter(Scheme::Memory, [])?;
455 let path = "test_file";
456
457 let content = gen_random_bytes();
458 op.write(path, content.clone())
459 .await
460 .expect("write must succeed");
461
462 let reader = op.reader(path).await.unwrap();
463 let buf = reader.read(..).await.expect("read to end must succeed");
464
465 assert_eq!(buf.to_bytes(), content);
466 Ok(())
467 }
468
469 #[tokio::test]
470 async fn test_reader_read_with_chunk() -> Result<()> {
471 let op = Operator::via_iter(Scheme::Memory, [])?;
472 let path = "test_file";
473
474 let content = gen_random_bytes();
475 op.write(path, content.clone())
476 .await
477 .expect("write must succeed");
478
479 let reader = op.reader_with(path).chunk(16).await.unwrap();
480 let buf = reader.read(..).await.expect("read to end must succeed");
481
482 assert_eq!(buf.to_bytes(), content);
483 Ok(())
484 }
485
486 #[tokio::test]
487 async fn test_reader_read_with_concurrent() -> Result<()> {
488 let op = Operator::via_iter(Scheme::Memory, [])?;
489 let path = "test_file";
490
491 let content = gen_random_bytes();
492 op.write(path, content.clone())
493 .await
494 .expect("write must succeed");
495
496 let reader = op
497 .reader_with(path)
498 .chunk(128)
499 .concurrent(16)
500 .await
501 .unwrap();
502 let buf = reader.read(..).await.expect("read to end must succeed");
503
504 assert_eq!(buf.to_bytes(), content);
505 Ok(())
506 }
507
508 #[tokio::test]
509 async fn test_reader_read_into() -> Result<()> {
510 let op = Operator::via_iter(Scheme::Memory, [])?;
511 let path = "test_file";
512
513 let content = gen_random_bytes();
514 op.write(path, content.clone())
515 .await
516 .expect("write must succeed");
517
518 let reader = op.reader(path).await.unwrap();
519 let mut buf = Vec::new();
520 reader
521 .read_into(&mut buf, ..)
522 .await
523 .expect("read to end must succeed");
524
525 assert_eq!(buf, content);
526 Ok(())
527 }
528
529 #[tokio::test]
530 async fn test_merge_ranges() -> Result<()> {
531 let op = Operator::new(services::Memory::default()).unwrap().finish();
532 let path = "test_file";
533
534 let content = gen_random_bytes();
535 op.write(path, content.clone())
536 .await
537 .expect("write must succeed");
538
539 let reader = op.reader_with(path).gap(1).await.unwrap();
540
541 let ranges = vec![0..10, 10..20, 21..30, 40..50, 40..60, 45..59];
542 let merged = reader.merge_ranges(ranges.clone());
543 assert_eq!(merged, vec![0..30, 40..60]);
544 Ok(())
545 }
546
547 #[tokio::test]
548 async fn test_fetch() -> Result<()> {
549 let op = Operator::new(services::Memory::default()).unwrap().finish();
550 let path = "test_file";
551
552 let content = gen_fixed_bytes(1024);
553 op.write(path, content.clone())
554 .await
555 .expect("write must succeed");
556
557 let reader = op.reader_with(path).gap(1).await.unwrap();
558
559 let ranges = vec![
560 0..10,
561 40..50,
562 45..59,
563 10..20,
564 21..30,
565 40..50,
566 40..60,
567 45..59,
568 ];
569 let merged = reader
570 .fetch(ranges.clone())
571 .await
572 .expect("fetch must succeed");
573
574 for (i, range) in ranges.iter().enumerate() {
575 assert_eq!(
576 merged[i].to_bytes(),
577 content[range.start as usize..range.end as usize]
578 );
579 }
580 Ok(())
581 }
582}