opendal/types/
buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::convert::Infallible;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::io::IoSlice;
23use std::io::{self, BufRead, Read, Seek, SeekFrom};
24use std::mem;
25use std::ops::Bound;
26use std::ops::RangeBounds;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::Context;
30use std::task::Poll;
31
32use bytes::Buf;
33use bytes::BufMut;
34use bytes::Bytes;
35use bytes::BytesMut;
36use futures::Stream;
37
38use crate::*;
39
40/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`.
41///
42/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example,
43/// http based storage like s3 could generate non-contiguous bytes by stream.
44///
45/// ## Features
46///
47/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly.
48/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation.
49/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write.
50///
51/// ## Examples
52///
53/// ### As `Buf`
54///
55/// `Buffer` implements `Buf` trait:
56///
57/// ```rust
58/// use bytes::Buf;
59/// use opendal::Buffer;
60/// use serde_json;
61///
62/// fn test(mut buf: Buffer) -> Vec<String> {
63///     serde_json::from_reader(buf.reader()).unwrap()
64/// }
65/// ```
66///
67/// ### As Bytes `Iterator`
68///
69/// `Buffer` implements `Iterator<Item=Bytes>` trait:
70///
71/// ```rust
72/// use bytes::Bytes;
73/// use opendal::Buffer;
74///
75/// fn test(mut buf: Buffer) -> Vec<Bytes> {
76///     buf.into_iter().collect()
77/// }
78/// ```
79///
80/// ### As Bytes `Stream`
81///
82/// `Buffer` implements `Stream<Item=Result<Bytes, Infallible>>` trait:
83///
84/// ```rust
85/// use bytes::Bytes;
86/// use futures::TryStreamExt;
87/// use opendal::Buffer;
88///
89/// async fn test(mut buf: Buffer) -> Vec<Bytes> {
90///     buf.into_iter().try_collect().await.unwrap()
91/// }
92/// ```
93///
94/// ### As one contiguous Bytes
95///
96/// `Buffer` can make contiguous by transform into `Bytes` or `Vec<u8>`.
97/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't
98/// reuse the same memory region anymore.
99///
100/// ```rust
101/// use bytes::Bytes;
102/// use opendal::Buffer;
103///
104/// fn test_to_vec(buf: Buffer) -> Vec<u8> {
105///     buf.to_vec()
106/// }
107///
108/// fn test_to_bytes(buf: Buffer) -> Bytes {
109///     buf.to_bytes()
110/// }
111/// ```
112#[derive(Clone)]
113pub struct Buffer(Inner);
114
115#[derive(Clone)]
116enum Inner {
117    Contiguous(Bytes),
118    NonContiguous {
119        parts: Arc<[Bytes]>,
120        size: usize,
121        idx: usize,
122        offset: usize,
123    },
124}
125
126impl Debug for Buffer {
127    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
128        let mut b = f.debug_struct("Buffer");
129
130        match &self.0 {
131            Inner::Contiguous(bs) => {
132                b.field("type", &"contiguous");
133                b.field("size", &bs.len());
134            }
135            Inner::NonContiguous {
136                parts,
137                size,
138                idx,
139                offset,
140            } => {
141                b.field("type", &"non_contiguous");
142                b.field("parts", &parts);
143                b.field("size", &size);
144                b.field("idx", &idx);
145                b.field("offset", &offset);
146            }
147        }
148        b.finish_non_exhaustive()
149    }
150}
151
152impl Default for Buffer {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158impl Buffer {
159    /// Create a new empty buffer.
160    ///
161    /// This operation is const and no allocation will be performed.
162    #[inline]
163    pub const fn new() -> Self {
164        Self(Inner::Contiguous(Bytes::new()))
165    }
166
167    /// Get the length of the buffer.
168    #[inline]
169    pub fn len(&self) -> usize {
170        match &self.0 {
171            Inner::Contiguous(b) => b.remaining(),
172            Inner::NonContiguous { size, .. } => *size,
173        }
174    }
175
176    /// Check if buffer is empty.
177    #[inline]
178    pub fn is_empty(&self) -> bool {
179        self.len() == 0
180    }
181
182    /// Number of [`Bytes`] in [`Buffer`].
183    ///
184    /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes
185    /// available for use.
186    pub fn count(&self) -> usize {
187        match &self.0 {
188            Inner::Contiguous(_) => 1,
189            Inner::NonContiguous {
190                parts,
191                idx,
192                size,
193                offset,
194            } => {
195                parts
196                    .iter()
197                    .skip(*idx)
198                    .fold((0, size + offset), |(count, size), bytes| {
199                        if size == 0 {
200                            (count, 0)
201                        } else {
202                            (count + 1, size.saturating_sub(bytes.len()))
203                        }
204                    })
205                    .0
206            }
207        }
208    }
209
210    /// Get current [`Bytes`].
211    pub fn current(&self) -> Bytes {
212        match &self.0 {
213            Inner::Contiguous(inner) => inner.clone(),
214            Inner::NonContiguous {
215                parts,
216                idx,
217                offset,
218                size,
219            } => {
220                let chunk = &parts[*idx];
221                let n = (chunk.len() - *offset).min(*size);
222                chunk.slice(*offset..*offset + n)
223            }
224        }
225    }
226
227    /// Shortens the buffer, keeping the first `len` bytes and dropping the rest.
228    ///
229    /// If `len` is greater than the buffer’s current length, this has no effect.
230    #[inline]
231    pub fn truncate(&mut self, len: usize) {
232        match &mut self.0 {
233            Inner::Contiguous(bs) => bs.truncate(len),
234            Inner::NonContiguous { size, .. } => {
235                *size = (*size).min(len);
236            }
237        }
238    }
239
240    /// Returns a slice of self for the provided range.
241    ///
242    /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice.
243    ///
244    /// This operation is O(1).
245    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
246        let len = self.len();
247
248        let begin = match range.start_bound() {
249            Bound::Included(&n) => n,
250            Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
251            Bound::Unbounded => 0,
252        };
253
254        let end = match range.end_bound() {
255            Bound::Included(&n) => n.checked_add(1).expect("out of range"),
256            Bound::Excluded(&n) => n,
257            Bound::Unbounded => len,
258        };
259
260        assert!(
261            begin <= end,
262            "range start must not be greater than end: {:?} <= {:?}",
263            begin,
264            end,
265        );
266        assert!(
267            end <= len,
268            "range end out of bounds: {:?} <= {:?}",
269            end,
270            len,
271        );
272
273        if end == begin {
274            return Buffer::new();
275        }
276
277        let mut ret = self.clone();
278        ret.truncate(end);
279        ret.advance(begin);
280        ret
281    }
282
283    /// Combine all bytes together into one single [`Bytes`].
284    ///
285    /// This operation is zero copy if the underlying bytes are contiguous.
286    /// Otherwise, it will copy all bytes into one single [`Bytes`].
287    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
288    #[inline]
289    pub fn to_bytes(&self) -> Bytes {
290        match &self.0 {
291            Inner::Contiguous(bytes) => bytes.clone(),
292            Inner::NonContiguous {
293                parts,
294                size,
295                idx: _,
296                offset,
297            } => {
298                if parts.len() == 1 {
299                    parts[0].slice(*offset..(*offset + *size))
300                } else {
301                    let mut ret = BytesMut::with_capacity(self.len());
302                    ret.put(self.clone());
303                    ret.freeze()
304                }
305            }
306        }
307    }
308
309    /// Combine all bytes together into one single [`Vec<u8>`].
310    ///
311    /// This operation is not zero copy, it will copy all bytes into one single [`Vec<u8>`].
312    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
313    #[inline]
314    pub fn to_vec(&self) -> Vec<u8> {
315        let mut ret = Vec::with_capacity(self.len());
316        ret.put(self.clone());
317        ret
318    }
319
320    /// Convert buffer into a slice of [`IoSlice`] for vectored write.
321    #[inline]
322    pub fn to_io_slice(&self) -> Vec<IoSlice<'_>> {
323        match &self.0 {
324            Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())],
325            Inner::NonContiguous {
326                parts, idx, offset, ..
327            } => {
328                let mut ret = Vec::with_capacity(parts.len() - *idx);
329                let mut new_offset = *offset;
330                for part in parts.iter().skip(*idx) {
331                    ret.push(IoSlice::new(&part[new_offset..]));
332                    new_offset = 0;
333                }
334                ret
335            }
336        }
337    }
338}
339
340impl From<Vec<u8>> for Buffer {
341    #[inline]
342    fn from(bs: Vec<u8>) -> Self {
343        Self(Inner::Contiguous(bs.into()))
344    }
345}
346
347impl From<Bytes> for Buffer {
348    #[inline]
349    fn from(bs: Bytes) -> Self {
350        Self(Inner::Contiguous(bs))
351    }
352}
353
354impl From<String> for Buffer {
355    #[inline]
356    fn from(s: String) -> Self {
357        Self(Inner::Contiguous(Bytes::from(s)))
358    }
359}
360
361impl From<&'static [u8]> for Buffer {
362    #[inline]
363    fn from(s: &'static [u8]) -> Self {
364        Self(Inner::Contiguous(Bytes::from_static(s)))
365    }
366}
367
368impl From<&'static str> for Buffer {
369    #[inline]
370    fn from(s: &'static str) -> Self {
371        Self(Inner::Contiguous(Bytes::from_static(s.as_bytes())))
372    }
373}
374
375impl FromIterator<u8> for Buffer {
376    #[inline]
377    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
378        Self(Inner::Contiguous(Bytes::from_iter(iter)))
379    }
380}
381
382impl From<VecDeque<Bytes>> for Buffer {
383    #[inline]
384    fn from(bs: VecDeque<Bytes>) -> Self {
385        let size = bs.iter().map(Bytes::len).sum();
386        Self(Inner::NonContiguous {
387            parts: Vec::from(bs).into(),
388            size,
389            idx: 0,
390            offset: 0,
391        })
392    }
393}
394
395impl From<Vec<Bytes>> for Buffer {
396    #[inline]
397    fn from(bs: Vec<Bytes>) -> Self {
398        let size = bs.iter().map(Bytes::len).sum();
399        Self(Inner::NonContiguous {
400            parts: bs.into(),
401            size,
402            idx: 0,
403            offset: 0,
404        })
405    }
406}
407
408impl From<Arc<[Bytes]>> for Buffer {
409    #[inline]
410    fn from(bs: Arc<[Bytes]>) -> Self {
411        let size = bs.iter().map(Bytes::len).sum();
412        Self(Inner::NonContiguous {
413            parts: bs,
414            size,
415            idx: 0,
416            offset: 0,
417        })
418    }
419}
420
421impl FromIterator<Bytes> for Buffer {
422    #[inline]
423    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
424        let mut size = 0;
425        let bs = iter.into_iter().inspect(|v| size += v.len());
426        // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead
427        // of iterator -> `Vec<Bytes>` -> `Arc<[Bytes]>`.
428        let parts = Arc::from_iter(bs);
429        Self(Inner::NonContiguous {
430            parts,
431            size,
432            idx: 0,
433            offset: 0,
434        })
435    }
436}
437
438impl Buf for Buffer {
439    #[inline]
440    fn remaining(&self) -> usize {
441        self.len()
442    }
443
444    #[inline]
445    fn chunk(&self) -> &[u8] {
446        match &self.0 {
447            Inner::Contiguous(b) => b.chunk(),
448            Inner::NonContiguous {
449                parts,
450                size,
451                idx,
452                offset,
453            } => {
454                if *size == 0 {
455                    return &[];
456                }
457
458                let chunk = &parts[*idx];
459                let n = (chunk.len() - *offset).min(*size);
460                &parts[*idx][*offset..*offset + n]
461            }
462        }
463    }
464
465    #[inline]
466    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
467        match &self.0 {
468            Inner::Contiguous(b) => {
469                if dst.is_empty() {
470                    return 0;
471                }
472
473                dst[0] = IoSlice::new(b.chunk());
474                1
475            }
476            Inner::NonContiguous {
477                parts, idx, offset, ..
478            } => {
479                if dst.is_empty() {
480                    return 0;
481                }
482
483                let mut new_offset = *offset;
484                parts
485                    .iter()
486                    .skip(*idx)
487                    .zip(dst.iter_mut())
488                    .map(|(part, dst)| {
489                        *dst = IoSlice::new(&part[new_offset..]);
490                        new_offset = 0;
491                    })
492                    .count()
493            }
494        }
495    }
496
497    #[inline]
498    fn advance(&mut self, cnt: usize) {
499        match &mut self.0 {
500            Inner::Contiguous(b) => b.advance(cnt),
501            Inner::NonContiguous {
502                parts,
503                size,
504                idx,
505                offset,
506            } => {
507                assert!(
508                    cnt <= *size,
509                    "cannot advance past {cnt} bytes, only {size} bytes left"
510                );
511
512                let mut new_idx = *idx;
513                let mut new_offset = *offset;
514                let mut remaining_cnt = cnt;
515                while remaining_cnt > 0 {
516                    let part_len = parts[new_idx].len();
517                    let remaining_in_part = part_len - new_offset;
518
519                    if remaining_cnt < remaining_in_part {
520                        new_offset += remaining_cnt;
521                        break;
522                    }
523
524                    remaining_cnt -= remaining_in_part;
525                    new_idx += 1;
526                    new_offset = 0;
527                }
528
529                *idx = new_idx;
530                *offset = new_offset;
531                *size -= cnt;
532            }
533        }
534    }
535}
536
537impl Iterator for Buffer {
538    type Item = Bytes;
539
540    fn next(&mut self) -> Option<Self::Item> {
541        match &mut self.0 {
542            Inner::Contiguous(bs) => {
543                if bs.is_empty() {
544                    None
545                } else {
546                    Some(mem::take(bs))
547                }
548            }
549            Inner::NonContiguous {
550                parts,
551                size,
552                idx,
553                offset,
554            } => {
555                if *size == 0 {
556                    return None;
557                }
558
559                let chunk = &parts[*idx];
560                let n = (chunk.len() - *offset).min(*size);
561                let buf = chunk.slice(*offset..*offset + n);
562                *size -= n;
563                *offset += n;
564
565                if *offset == chunk.len() {
566                    *idx += 1;
567                    *offset = 0;
568                }
569
570                Some(buf)
571            }
572        }
573    }
574
575    fn size_hint(&self) -> (usize, Option<usize>) {
576        match &self.0 {
577            Inner::Contiguous(bs) => {
578                if bs.is_empty() {
579                    (0, Some(0))
580                } else {
581                    (1, Some(1))
582                }
583            }
584            Inner::NonContiguous { parts, idx, .. } => {
585                let remaining = parts.len().saturating_sub(*idx);
586                (remaining, Some(remaining))
587            }
588        }
589    }
590}
591
592impl Stream for Buffer {
593    type Item = Result<Bytes, Infallible>;
594
595    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
596        Poll::Ready(self.get_mut().next().map(Ok))
597    }
598
599    fn size_hint(&self) -> (usize, Option<usize>) {
600        Iterator::size_hint(self)
601    }
602}
603
604impl Read for Buffer {
605    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
606        let chunk = self.chunk();
607        let len = chunk.len().min(buf.len());
608        buf[..len].copy_from_slice(&chunk[..len]);
609        self.advance(len);
610        Ok(len)
611    }
612}
613
614impl Seek for Buffer {
615    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
616        let len = self.len() as u64;
617        let new_pos = match pos {
618            SeekFrom::Start(offset) => offset,
619            SeekFrom::End(offset) => {
620                if offset < 0 {
621                    len.checked_sub(offset.unsigned_abs())
622                        .ok_or(io::Error::new(
623                            io::ErrorKind::InvalidInput,
624                            "invalid seek to a negative position",
625                        ))?
626                } else {
627                    len.checked_add(offset as u64).ok_or(io::Error::new(
628                        io::ErrorKind::InvalidInput,
629                        "seek out of bounds",
630                    ))?
631                }
632            }
633            SeekFrom::Current(offset) => {
634                let current_pos = (len - self.remaining() as u64) as i64;
635                let new_pos = current_pos.checked_add(offset).ok_or(io::Error::new(
636                    io::ErrorKind::InvalidInput,
637                    "seek out of bounds",
638                ))?;
639                if new_pos < 0 {
640                    return Err(io::Error::new(
641                        io::ErrorKind::InvalidInput,
642                        "invalid seek to a negative position",
643                    ));
644                }
645                new_pos as u64
646            }
647        };
648
649        if new_pos > len {
650            return Err(io::Error::new(
651                io::ErrorKind::InvalidInput,
652                "seek out of bounds",
653            ));
654        }
655
656        self.advance((new_pos - (len - self.remaining() as u64)) as usize);
657        Ok(new_pos)
658    }
659}
660
661impl BufRead for Buffer {
662    fn fill_buf(&mut self) -> io::Result<&[u8]> {
663        let chunk = match &self.0 {
664            Inner::Contiguous(b) => b.chunk(),
665            Inner::NonContiguous {
666                parts,
667                size,
668                idx,
669                offset,
670            } => {
671                if *size == 0 {
672                    return Ok(&[]);
673                }
674
675                let chunk = &parts[*idx];
676                let n = (chunk.len() - *offset).min(*size);
677                &parts[*idx][*offset..*offset + n]
678            }
679        };
680        Ok(chunk)
681    }
682
683    fn consume(&mut self, amt: usize) {
684        self.advance(amt);
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use pretty_assertions::assert_eq;
691    use rand::prelude::*;
692    use std::io::{BufRead, Read, Seek, SeekFrom};
693
694    use super::*;
695
696    const EMPTY_SLICE: &[u8] = &[];
697
698    #[test]
699    fn test_contiguous_buffer() {
700        let mut buf = Buffer::new();
701
702        assert_eq!(buf.remaining(), 0);
703        assert_eq!(buf.chunk(), EMPTY_SLICE);
704        assert_eq!(buf.next(), None);
705    }
706
707    #[test]
708    fn test_empty_non_contiguous_buffer() {
709        let mut buf = Buffer::from(vec![Bytes::new()]);
710
711        assert_eq!(buf.remaining(), 0);
712        assert_eq!(buf.chunk(), EMPTY_SLICE);
713        assert_eq!(buf.next(), None);
714    }
715
716    #[test]
717    fn test_non_contiguous_buffer_with_empty_chunks() {
718        let mut buf = Buffer::from(vec![Bytes::from("a")]);
719
720        assert_eq!(buf.remaining(), 1);
721        assert_eq!(buf.chunk(), b"a");
722
723        buf.advance(1);
724
725        assert_eq!(buf.remaining(), 0);
726        assert_eq!(buf.chunk(), EMPTY_SLICE);
727    }
728
729    #[test]
730    fn test_non_contiguous_buffer_with_next() {
731        let mut buf = Buffer::from(vec![Bytes::from("a")]);
732
733        assert_eq!(buf.remaining(), 1);
734        assert_eq!(buf.chunk(), b"a");
735
736        let bs = buf.next();
737
738        assert_eq!(bs, Some(Bytes::from("a")));
739        assert_eq!(buf.remaining(), 0);
740        assert_eq!(buf.chunk(), EMPTY_SLICE);
741    }
742
743    #[test]
744    fn test_buffer_advance() {
745        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
746
747        assert_eq!(buf.remaining(), 3);
748        assert_eq!(buf.chunk(), b"a");
749
750        buf.advance(1);
751
752        assert_eq!(buf.remaining(), 2);
753        assert_eq!(buf.chunk(), b"b");
754
755        buf.advance(1);
756
757        assert_eq!(buf.remaining(), 1);
758        assert_eq!(buf.chunk(), b"c");
759
760        buf.advance(1);
761
762        assert_eq!(buf.remaining(), 0);
763        assert_eq!(buf.chunk(), EMPTY_SLICE);
764
765        buf.advance(0);
766
767        assert_eq!(buf.remaining(), 0);
768        assert_eq!(buf.chunk(), EMPTY_SLICE);
769    }
770
771    #[test]
772    fn test_buffer_truncate() {
773        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
774
775        assert_eq!(buf.remaining(), 3);
776        assert_eq!(buf.chunk(), b"a");
777
778        buf.truncate(100);
779
780        assert_eq!(buf.remaining(), 3);
781        assert_eq!(buf.chunk(), b"a");
782
783        buf.truncate(2);
784
785        assert_eq!(buf.remaining(), 2);
786        assert_eq!(buf.chunk(), b"a");
787
788        buf.truncate(0);
789
790        assert_eq!(buf.remaining(), 0);
791        assert_eq!(buf.chunk(), EMPTY_SLICE);
792    }
793
794    /// This setup will return
795    ///
796    /// - A buffer
797    /// - Total size of this buffer.
798    /// - Total content of this buffer.
799    fn setup_buffer() -> (Buffer, usize, Bytes) {
800        let mut rng = thread_rng();
801
802        let bs = (0..100)
803            .map(|_| {
804                let len = rng.gen_range(1..100);
805                let mut buf = vec![0; len];
806                rng.fill(&mut buf[..]);
807                Bytes::from(buf)
808            })
809            .collect::<Vec<_>>();
810
811        let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
812        let total_content = bs.iter().flatten().copied().collect::<Bytes>();
813        let buf = Buffer::from(bs);
814
815        (buf, total_size, total_content)
816    }
817
818    #[test]
819    fn fuzz_buffer_advance() {
820        let mut rng = thread_rng();
821
822        let (mut buf, total_size, total_content) = setup_buffer();
823        assert_eq!(buf.remaining(), total_size);
824        assert_eq!(buf.to_bytes(), total_content);
825
826        let mut cur = 0;
827        // Loop at most 10000 times.
828        let mut times = 10000;
829        while !buf.is_empty() && times > 0 {
830            times -= 1;
831
832            let cnt = rng.gen_range(0..total_size - cur);
833            cur += cnt;
834            buf.advance(cnt);
835
836            assert_eq!(buf.remaining(), total_size - cur);
837            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
838        }
839    }
840
841    #[test]
842    fn fuzz_buffer_iter() {
843        let mut rng = thread_rng();
844
845        let (mut buf, total_size, total_content) = setup_buffer();
846        assert_eq!(buf.remaining(), total_size);
847        assert_eq!(buf.to_bytes(), total_content);
848
849        let mut cur = 0;
850        while buf.is_empty() {
851            let cnt = rng.gen_range(0..total_size - cur);
852            cur += cnt;
853            buf.advance(cnt);
854
855            // Before next
856            assert_eq!(buf.remaining(), total_size - cur);
857            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
858
859            if let Some(bs) = buf.next() {
860                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
861                cur += bs.len();
862            }
863
864            // After next
865            assert_eq!(buf.remaining(), total_size - cur);
866            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
867        }
868    }
869
870    #[test]
871    fn fuzz_buffer_truncate() {
872        let mut rng = thread_rng();
873
874        let (mut buf, total_size, total_content) = setup_buffer();
875        assert_eq!(buf.remaining(), total_size);
876        assert_eq!(buf.to_bytes(), total_content);
877
878        let mut cur = 0;
879        while buf.is_empty() {
880            let cnt = rng.gen_range(0..total_size - cur);
881            cur += cnt;
882            buf.advance(cnt);
883
884            // Before truncate
885            assert_eq!(buf.remaining(), total_size - cur);
886            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
887
888            let truncate_size = rng.gen_range(0..total_size - cur);
889            buf.truncate(truncate_size);
890
891            // After truncate
892            assert_eq!(buf.remaining(), truncate_size);
893            assert_eq!(
894                buf.to_bytes(),
895                total_content.slice(cur..cur + truncate_size)
896            );
897
898            // Try next after truncate
899            if let Some(bs) = buf.next() {
900                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
901                cur += bs.len();
902            }
903
904            // After next
905            assert_eq!(buf.remaining(), total_size - cur);
906            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
907        }
908    }
909
910    #[test]
911    fn test_read_trait() {
912        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
913        let mut output = vec![0; 5];
914        let size = buffer.read(&mut output).unwrap();
915        assert_eq!(size, 5);
916        assert_eq!(&output, b"Hello");
917    }
918
919    #[test]
920    fn test_seek_trait() {
921        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
922        buffer.seek(SeekFrom::Start(5)).unwrap();
923        let mut output = vec![0; 5];
924        buffer.read_exact(&mut output).unwrap();
925        assert_eq!(&output, b"World");
926    }
927
928    #[test]
929    fn test_bufread_trait() {
930        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
931        let mut output = String::new();
932        buffer.read_to_string(&mut output).unwrap();
933        assert_eq!(output, "HelloWorld");
934
935        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
936        let buf = buffer.fill_buf().unwrap();
937        assert_eq!(buf, b"Hello");
938        buffer.consume(5);
939        let buf = buffer.fill_buf().unwrap();
940        assert_eq!(buf, b"World");
941    }
942
943    #[test]
944    fn test_read_partial() {
945        let mut buffer = Buffer::from(vec![Bytes::from("Partial"), Bytes::from("Read")]);
946        let mut output = vec![0; 4];
947        let size = buffer.read(&mut output).unwrap();
948        assert_eq!(size, 4);
949        assert_eq!(&output, b"Part");
950
951        let size = buffer.read(&mut output).unwrap();
952        assert_eq!(size, 3);
953        assert_eq!(&output[..3], b"ial");
954    }
955
956    #[test]
957    fn test_seek_and_read() {
958        let mut buffer = Buffer::from(vec![Bytes::from("SeekAndRead")]);
959        buffer.seek(SeekFrom::Start(4)).unwrap();
960        let mut output = vec![0; 3];
961        buffer.read_exact(&mut output).unwrap();
962        assert_eq!(&output, b"And");
963    }
964
965    #[test]
966    fn test_bufread_consume() {
967        let mut buffer = Buffer::from(vec![Bytes::from("ConsumeTest")]);
968        let buf = buffer.fill_buf().unwrap();
969        assert_eq!(buf, b"ConsumeTest");
970        buffer.consume(7);
971        let buf = buffer.fill_buf().unwrap();
972        assert_eq!(buf, b"Test");
973    }
974
975    #[test]
976    fn test_empty_buffer() {
977        let mut buffer = Buffer::new();
978        let mut output = vec![0; 5];
979        let size = buffer.read(&mut output).unwrap();
980        assert_eq!(size, 0);
981        assert_eq!(&output, &[0; 5]);
982    }
983
984    #[test]
985    fn test_seek_out_of_bounds() {
986        let mut buffer = Buffer::from(vec![Bytes::from("OutOfBounds")]);
987        let result = buffer.seek(SeekFrom::Start(100));
988        assert!(result.is_err());
989    }
990}