opendal/types/
buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::convert::Infallible;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::io::BufRead;
23use std::io::IoSlice;
24use std::io::Read;
25use std::io::Seek;
26use std::io::SeekFrom;
27use std::io::{self};
28use std::mem;
29use std::ops::Bound;
30use std::ops::RangeBounds;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::Context;
34use std::task::Poll;
35
36use bytes::Buf;
37use bytes::BufMut;
38use bytes::Bytes;
39use bytes::BytesMut;
40use futures::Stream;
41
42use crate::*;
43
44/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`.
45///
46/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example,
47/// http based storage like s3 could generate non-contiguous bytes by stream.
48///
49/// ## Features
50///
51/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly.
52/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation.
53/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write.
54///
55/// ## Examples
56///
57/// ### As `Buf`
58///
59/// `Buffer` implements `Buf` trait:
60///
61/// ```rust
62/// use bytes::Buf;
63/// use opendal::Buffer;
64/// use serde_json;
65///
66/// fn test(mut buf: Buffer) -> Vec<String> {
67///     serde_json::from_reader(buf.reader()).unwrap()
68/// }
69/// ```
70///
71/// ### As Bytes `Iterator`
72///
73/// `Buffer` implements `Iterator<Item=Bytes>` trait:
74///
75/// ```rust
76/// use bytes::Bytes;
77/// use opendal::Buffer;
78///
79/// fn test(mut buf: Buffer) -> Vec<Bytes> {
80///     buf.into_iter().collect()
81/// }
82/// ```
83///
84/// ### As Bytes `Stream`
85///
86/// `Buffer` implements `Stream<Item=Result<Bytes, Infallible>>` trait:
87///
88/// ```rust
89/// use bytes::Bytes;
90/// use futures::TryStreamExt;
91/// use opendal::Buffer;
92///
93/// async fn test(mut buf: Buffer) -> Vec<Bytes> {
94///     buf.into_iter().try_collect().await.unwrap()
95/// }
96/// ```
97///
98/// ### As one contiguous Bytes
99///
100/// `Buffer` can make contiguous by transform into `Bytes` or `Vec<u8>`.
101/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't
102/// reuse the same memory region anymore.
103///
104/// ```rust
105/// use bytes::Bytes;
106/// use opendal::Buffer;
107///
108/// fn test_to_vec(buf: Buffer) -> Vec<u8> {
109///     buf.to_vec()
110/// }
111///
112/// fn test_to_bytes(buf: Buffer) -> Bytes {
113///     buf.to_bytes()
114/// }
115/// ```
116#[derive(Clone)]
117pub struct Buffer(Inner);
118
119#[derive(Clone)]
120enum Inner {
121    Contiguous(Bytes),
122    NonContiguous {
123        parts: Arc<[Bytes]>,
124        size: usize,
125        idx: usize,
126        offset: usize,
127    },
128}
129
130impl Debug for Buffer {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        let mut b = f.debug_struct("Buffer");
133
134        match &self.0 {
135            Inner::Contiguous(bs) => {
136                b.field("type", &"contiguous");
137                b.field("size", &bs.len());
138            }
139            Inner::NonContiguous {
140                parts,
141                size,
142                idx,
143                offset,
144            } => {
145                b.field("type", &"non_contiguous");
146                b.field("parts", &parts);
147                b.field("size", &size);
148                b.field("idx", &idx);
149                b.field("offset", &offset);
150            }
151        }
152        b.finish_non_exhaustive()
153    }
154}
155
156impl Default for Buffer {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl Buffer {
163    /// Create a new empty buffer.
164    ///
165    /// This operation is const and no allocation will be performed.
166    #[inline]
167    pub const fn new() -> Self {
168        Self(Inner::Contiguous(Bytes::new()))
169    }
170
171    /// Get the length of the buffer.
172    #[inline]
173    pub fn len(&self) -> usize {
174        match &self.0 {
175            Inner::Contiguous(b) => b.remaining(),
176            Inner::NonContiguous { size, .. } => *size,
177        }
178    }
179
180    /// Check if buffer is empty.
181    #[inline]
182    pub fn is_empty(&self) -> bool {
183        self.len() == 0
184    }
185
186    /// Number of [`Bytes`] in [`Buffer`].
187    ///
188    /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes
189    /// available for use.
190    pub fn count(&self) -> usize {
191        match &self.0 {
192            Inner::Contiguous(_) => 1,
193            Inner::NonContiguous {
194                parts,
195                idx,
196                size,
197                offset,
198            } => {
199                parts
200                    .iter()
201                    .skip(*idx)
202                    .fold((0, size + offset), |(count, size), bytes| {
203                        if size == 0 {
204                            (count, 0)
205                        } else {
206                            (count + 1, size.saturating_sub(bytes.len()))
207                        }
208                    })
209                    .0
210            }
211        }
212    }
213
214    /// Get current [`Bytes`].
215    pub fn current(&self) -> Bytes {
216        match &self.0 {
217            Inner::Contiguous(inner) => inner.clone(),
218            Inner::NonContiguous {
219                parts,
220                idx,
221                offset,
222                size,
223            } => {
224                let chunk = &parts[*idx];
225                let n = (chunk.len() - *offset).min(*size);
226                chunk.slice(*offset..*offset + n)
227            }
228        }
229    }
230
231    /// Shortens the buffer, keeping the first `len` bytes and dropping the rest.
232    ///
233    /// If `len` is greater than the buffer’s current length, this has no effect.
234    #[inline]
235    pub fn truncate(&mut self, len: usize) {
236        match &mut self.0 {
237            Inner::Contiguous(bs) => bs.truncate(len),
238            Inner::NonContiguous { size, .. } => {
239                *size = (*size).min(len);
240            }
241        }
242    }
243
244    /// Returns a slice of self for the provided range.
245    ///
246    /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice.
247    ///
248    /// This operation is O(1).
249    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
250        let len = self.len();
251
252        let begin = match range.start_bound() {
253            Bound::Included(&n) => n,
254            Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
255            Bound::Unbounded => 0,
256        };
257
258        let end = match range.end_bound() {
259            Bound::Included(&n) => n.checked_add(1).expect("out of range"),
260            Bound::Excluded(&n) => n,
261            Bound::Unbounded => len,
262        };
263
264        assert!(
265            begin <= end,
266            "range start must not be greater than end: {:?} <= {:?}",
267            begin,
268            end,
269        );
270        assert!(
271            end <= len,
272            "range end out of bounds: {:?} <= {:?}",
273            end,
274            len,
275        );
276
277        if end == begin {
278            return Buffer::new();
279        }
280
281        let mut ret = self.clone();
282        ret.truncate(end);
283        ret.advance(begin);
284        ret
285    }
286
287    /// Combine all bytes together into one single [`Bytes`].
288    ///
289    /// This operation is zero copy if the underlying bytes are contiguous.
290    /// Otherwise, it will copy all bytes into one single [`Bytes`].
291    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
292    #[inline]
293    pub fn to_bytes(&self) -> Bytes {
294        match &self.0 {
295            Inner::Contiguous(bytes) => bytes.clone(),
296            Inner::NonContiguous {
297                parts,
298                size,
299                idx: _,
300                offset,
301            } => {
302                if parts.len() == 1 {
303                    parts[0].slice(*offset..(*offset + *size))
304                } else {
305                    let mut ret = BytesMut::with_capacity(self.len());
306                    ret.put(self.clone());
307                    ret.freeze()
308                }
309            }
310        }
311    }
312
313    /// Combine all bytes together into one single [`Vec<u8>`].
314    ///
315    /// This operation is not zero copy, it will copy all bytes into one single [`Vec<u8>`].
316    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
317    #[inline]
318    pub fn to_vec(&self) -> Vec<u8> {
319        let mut ret = Vec::with_capacity(self.len());
320        ret.put(self.clone());
321        ret
322    }
323
324    /// Convert buffer into a slice of [`IoSlice`] for vectored write.
325    #[inline]
326    pub fn to_io_slice(&self) -> Vec<IoSlice<'_>> {
327        match &self.0 {
328            Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())],
329            Inner::NonContiguous {
330                parts, idx, offset, ..
331            } => {
332                let mut ret = Vec::with_capacity(parts.len() - *idx);
333                let mut new_offset = *offset;
334                for part in parts.iter().skip(*idx) {
335                    ret.push(IoSlice::new(&part[new_offset..]));
336                    new_offset = 0;
337                }
338                ret
339            }
340        }
341    }
342}
343
344impl From<Vec<u8>> for Buffer {
345    #[inline]
346    fn from(bs: Vec<u8>) -> Self {
347        Self(Inner::Contiguous(bs.into()))
348    }
349}
350
351impl From<Bytes> for Buffer {
352    #[inline]
353    fn from(bs: Bytes) -> Self {
354        Self(Inner::Contiguous(bs))
355    }
356}
357
358impl From<String> for Buffer {
359    #[inline]
360    fn from(s: String) -> Self {
361        Self(Inner::Contiguous(Bytes::from(s)))
362    }
363}
364
365impl From<&'static [u8]> for Buffer {
366    #[inline]
367    fn from(s: &'static [u8]) -> Self {
368        Self(Inner::Contiguous(Bytes::from_static(s)))
369    }
370}
371
372impl From<&'static str> for Buffer {
373    #[inline]
374    fn from(s: &'static str) -> Self {
375        Self(Inner::Contiguous(Bytes::from_static(s.as_bytes())))
376    }
377}
378
379impl FromIterator<u8> for Buffer {
380    #[inline]
381    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
382        Self(Inner::Contiguous(Bytes::from_iter(iter)))
383    }
384}
385
386impl From<VecDeque<Bytes>> for Buffer {
387    #[inline]
388    fn from(bs: VecDeque<Bytes>) -> Self {
389        let size = bs.iter().map(Bytes::len).sum();
390        Self(Inner::NonContiguous {
391            parts: Vec::from(bs).into(),
392            size,
393            idx: 0,
394            offset: 0,
395        })
396    }
397}
398
399impl From<Vec<Bytes>> for Buffer {
400    #[inline]
401    fn from(bs: Vec<Bytes>) -> Self {
402        let size = bs.iter().map(Bytes::len).sum();
403        Self(Inner::NonContiguous {
404            parts: bs.into(),
405            size,
406            idx: 0,
407            offset: 0,
408        })
409    }
410}
411
412impl From<Arc<[Bytes]>> for Buffer {
413    #[inline]
414    fn from(bs: Arc<[Bytes]>) -> Self {
415        let size = bs.iter().map(Bytes::len).sum();
416        Self(Inner::NonContiguous {
417            parts: bs,
418            size,
419            idx: 0,
420            offset: 0,
421        })
422    }
423}
424
425impl FromIterator<Bytes> for Buffer {
426    #[inline]
427    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
428        let mut size = 0;
429        let bs = iter.into_iter().inspect(|v| size += v.len());
430        // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead
431        // of iterator -> `Vec<Bytes>` -> `Arc<[Bytes]>`.
432        let parts = Arc::from_iter(bs);
433        Self(Inner::NonContiguous {
434            parts,
435            size,
436            idx: 0,
437            offset: 0,
438        })
439    }
440}
441
442impl Buf for Buffer {
443    #[inline]
444    fn remaining(&self) -> usize {
445        self.len()
446    }
447
448    #[inline]
449    fn chunk(&self) -> &[u8] {
450        match &self.0 {
451            Inner::Contiguous(b) => b.chunk(),
452            Inner::NonContiguous {
453                parts,
454                size,
455                idx,
456                offset,
457            } => {
458                if *size == 0 {
459                    return &[];
460                }
461
462                let chunk = &parts[*idx];
463                let n = (chunk.len() - *offset).min(*size);
464                &parts[*idx][*offset..*offset + n]
465            }
466        }
467    }
468
469    #[inline]
470    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
471        match &self.0 {
472            Inner::Contiguous(b) => {
473                if dst.is_empty() {
474                    return 0;
475                }
476
477                dst[0] = IoSlice::new(b.chunk());
478                1
479            }
480            Inner::NonContiguous {
481                parts, idx, offset, ..
482            } => {
483                if dst.is_empty() {
484                    return 0;
485                }
486
487                let mut new_offset = *offset;
488                parts
489                    .iter()
490                    .skip(*idx)
491                    .zip(dst.iter_mut())
492                    .map(|(part, dst)| {
493                        *dst = IoSlice::new(&part[new_offset..]);
494                        new_offset = 0;
495                    })
496                    .count()
497            }
498        }
499    }
500
501    #[inline]
502    fn advance(&mut self, cnt: usize) {
503        match &mut self.0 {
504            Inner::Contiguous(b) => b.advance(cnt),
505            Inner::NonContiguous {
506                parts,
507                size,
508                idx,
509                offset,
510            } => {
511                assert!(
512                    cnt <= *size,
513                    "cannot advance past {cnt} bytes, only {size} bytes left"
514                );
515
516                let mut new_idx = *idx;
517                let mut new_offset = *offset;
518                let mut remaining_cnt = cnt;
519                while remaining_cnt > 0 {
520                    let part_len = parts[new_idx].len();
521                    let remaining_in_part = part_len - new_offset;
522
523                    if remaining_cnt < remaining_in_part {
524                        new_offset += remaining_cnt;
525                        break;
526                    }
527
528                    remaining_cnt -= remaining_in_part;
529                    new_idx += 1;
530                    new_offset = 0;
531                }
532
533                *idx = new_idx;
534                *offset = new_offset;
535                *size -= cnt;
536            }
537        }
538    }
539}
540
541impl Iterator for Buffer {
542    type Item = Bytes;
543
544    fn next(&mut self) -> Option<Self::Item> {
545        match &mut self.0 {
546            Inner::Contiguous(bs) => {
547                if bs.is_empty() {
548                    None
549                } else {
550                    Some(mem::take(bs))
551                }
552            }
553            Inner::NonContiguous {
554                parts,
555                size,
556                idx,
557                offset,
558            } => {
559                if *size == 0 {
560                    return None;
561                }
562
563                let chunk = &parts[*idx];
564                let n = (chunk.len() - *offset).min(*size);
565                let buf = chunk.slice(*offset..*offset + n);
566                *size -= n;
567                *offset += n;
568
569                if *offset == chunk.len() {
570                    *idx += 1;
571                    *offset = 0;
572                }
573
574                Some(buf)
575            }
576        }
577    }
578
579    fn size_hint(&self) -> (usize, Option<usize>) {
580        match &self.0 {
581            Inner::Contiguous(bs) => {
582                if bs.is_empty() {
583                    (0, Some(0))
584                } else {
585                    (1, Some(1))
586                }
587            }
588            Inner::NonContiguous { parts, idx, .. } => {
589                let remaining = parts.len().saturating_sub(*idx);
590                (remaining, Some(remaining))
591            }
592        }
593    }
594}
595
596impl Stream for Buffer {
597    type Item = Result<Bytes, Infallible>;
598
599    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
600        Poll::Ready(self.get_mut().next().map(Ok))
601    }
602
603    fn size_hint(&self) -> (usize, Option<usize>) {
604        Iterator::size_hint(self)
605    }
606}
607
608impl Read for Buffer {
609    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
610        let chunk = self.chunk();
611        let len = chunk.len().min(buf.len());
612        buf[..len].copy_from_slice(&chunk[..len]);
613        self.advance(len);
614        Ok(len)
615    }
616}
617
618impl Seek for Buffer {
619    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
620        let len = self.len() as u64;
621        let new_pos = match pos {
622            SeekFrom::Start(offset) => offset,
623            SeekFrom::End(offset) => {
624                if offset < 0 {
625                    len.checked_sub(offset.unsigned_abs())
626                        .ok_or(io::Error::new(
627                            io::ErrorKind::InvalidInput,
628                            "invalid seek to a negative position",
629                        ))?
630                } else {
631                    len.checked_add(offset as u64).ok_or(io::Error::new(
632                        io::ErrorKind::InvalidInput,
633                        "seek out of bounds",
634                    ))?
635                }
636            }
637            SeekFrom::Current(offset) => {
638                let current_pos = (len - self.remaining() as u64) as i64;
639                let new_pos = current_pos.checked_add(offset).ok_or(io::Error::new(
640                    io::ErrorKind::InvalidInput,
641                    "seek out of bounds",
642                ))?;
643                if new_pos < 0 {
644                    return Err(io::Error::new(
645                        io::ErrorKind::InvalidInput,
646                        "invalid seek to a negative position",
647                    ));
648                }
649                new_pos as u64
650            }
651        };
652
653        if new_pos > len {
654            return Err(io::Error::new(
655                io::ErrorKind::InvalidInput,
656                "seek out of bounds",
657            ));
658        }
659
660        self.advance((new_pos - (len - self.remaining() as u64)) as usize);
661        Ok(new_pos)
662    }
663}
664
665impl BufRead for Buffer {
666    fn fill_buf(&mut self) -> io::Result<&[u8]> {
667        let chunk = match &self.0 {
668            Inner::Contiguous(b) => b.chunk(),
669            Inner::NonContiguous {
670                parts,
671                size,
672                idx,
673                offset,
674            } => {
675                if *size == 0 {
676                    return Ok(&[]);
677                }
678
679                let chunk = &parts[*idx];
680                let n = (chunk.len() - *offset).min(*size);
681                &parts[*idx][*offset..*offset + n]
682            }
683        };
684        Ok(chunk)
685    }
686
687    fn consume(&mut self, amt: usize) {
688        self.advance(amt);
689    }
690}
691
692#[cfg(test)]
693mod tests {
694    use std::io::BufRead;
695    use std::io::Read;
696    use std::io::Seek;
697    use std::io::SeekFrom;
698
699    use pretty_assertions::assert_eq;
700    use rand::prelude::*;
701
702    use super::*;
703
704    const EMPTY_SLICE: &[u8] = &[];
705
706    #[test]
707    fn test_contiguous_buffer() {
708        let mut buf = Buffer::new();
709
710        assert_eq!(buf.remaining(), 0);
711        assert_eq!(buf.chunk(), EMPTY_SLICE);
712        assert_eq!(buf.next(), None);
713    }
714
715    #[test]
716    fn test_empty_non_contiguous_buffer() {
717        let mut buf = Buffer::from(vec![Bytes::new()]);
718
719        assert_eq!(buf.remaining(), 0);
720        assert_eq!(buf.chunk(), EMPTY_SLICE);
721        assert_eq!(buf.next(), None);
722    }
723
724    #[test]
725    fn test_non_contiguous_buffer_with_empty_chunks() {
726        let mut buf = Buffer::from(vec![Bytes::from("a")]);
727
728        assert_eq!(buf.remaining(), 1);
729        assert_eq!(buf.chunk(), b"a");
730
731        buf.advance(1);
732
733        assert_eq!(buf.remaining(), 0);
734        assert_eq!(buf.chunk(), EMPTY_SLICE);
735    }
736
737    #[test]
738    fn test_non_contiguous_buffer_with_next() {
739        let mut buf = Buffer::from(vec![Bytes::from("a")]);
740
741        assert_eq!(buf.remaining(), 1);
742        assert_eq!(buf.chunk(), b"a");
743
744        let bs = buf.next();
745
746        assert_eq!(bs, Some(Bytes::from("a")));
747        assert_eq!(buf.remaining(), 0);
748        assert_eq!(buf.chunk(), EMPTY_SLICE);
749    }
750
751    #[test]
752    fn test_buffer_advance() {
753        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
754
755        assert_eq!(buf.remaining(), 3);
756        assert_eq!(buf.chunk(), b"a");
757
758        buf.advance(1);
759
760        assert_eq!(buf.remaining(), 2);
761        assert_eq!(buf.chunk(), b"b");
762
763        buf.advance(1);
764
765        assert_eq!(buf.remaining(), 1);
766        assert_eq!(buf.chunk(), b"c");
767
768        buf.advance(1);
769
770        assert_eq!(buf.remaining(), 0);
771        assert_eq!(buf.chunk(), EMPTY_SLICE);
772
773        buf.advance(0);
774
775        assert_eq!(buf.remaining(), 0);
776        assert_eq!(buf.chunk(), EMPTY_SLICE);
777    }
778
779    #[test]
780    fn test_buffer_truncate() {
781        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
782
783        assert_eq!(buf.remaining(), 3);
784        assert_eq!(buf.chunk(), b"a");
785
786        buf.truncate(100);
787
788        assert_eq!(buf.remaining(), 3);
789        assert_eq!(buf.chunk(), b"a");
790
791        buf.truncate(2);
792
793        assert_eq!(buf.remaining(), 2);
794        assert_eq!(buf.chunk(), b"a");
795
796        buf.truncate(0);
797
798        assert_eq!(buf.remaining(), 0);
799        assert_eq!(buf.chunk(), EMPTY_SLICE);
800    }
801
802    /// This setup will return
803    ///
804    /// - A buffer
805    /// - Total size of this buffer.
806    /// - Total content of this buffer.
807    fn setup_buffer() -> (Buffer, usize, Bytes) {
808        let mut rng = thread_rng();
809
810        let bs = (0..100)
811            .map(|_| {
812                let len = rng.gen_range(1..100);
813                let mut buf = vec![0; len];
814                rng.fill(&mut buf[..]);
815                Bytes::from(buf)
816            })
817            .collect::<Vec<_>>();
818
819        let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
820        let total_content = bs.iter().flatten().copied().collect::<Bytes>();
821        let buf = Buffer::from(bs);
822
823        (buf, total_size, total_content)
824    }
825
826    #[test]
827    fn fuzz_buffer_advance() {
828        let mut rng = thread_rng();
829
830        let (mut buf, total_size, total_content) = setup_buffer();
831        assert_eq!(buf.remaining(), total_size);
832        assert_eq!(buf.to_bytes(), total_content);
833
834        let mut cur = 0;
835        // Loop at most 10000 times.
836        let mut times = 10000;
837        while !buf.is_empty() && times > 0 {
838            times -= 1;
839
840            let cnt = rng.gen_range(0..total_size - cur);
841            cur += cnt;
842            buf.advance(cnt);
843
844            assert_eq!(buf.remaining(), total_size - cur);
845            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
846        }
847    }
848
849    #[test]
850    fn fuzz_buffer_iter() {
851        let mut rng = thread_rng();
852
853        let (mut buf, total_size, total_content) = setup_buffer();
854        assert_eq!(buf.remaining(), total_size);
855        assert_eq!(buf.to_bytes(), total_content);
856
857        let mut cur = 0;
858        while buf.is_empty() {
859            let cnt = rng.gen_range(0..total_size - cur);
860            cur += cnt;
861            buf.advance(cnt);
862
863            // Before next
864            assert_eq!(buf.remaining(), total_size - cur);
865            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
866
867            if let Some(bs) = buf.next() {
868                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
869                cur += bs.len();
870            }
871
872            // After next
873            assert_eq!(buf.remaining(), total_size - cur);
874            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
875        }
876    }
877
878    #[test]
879    fn fuzz_buffer_truncate() {
880        let mut rng = thread_rng();
881
882        let (mut buf, total_size, total_content) = setup_buffer();
883        assert_eq!(buf.remaining(), total_size);
884        assert_eq!(buf.to_bytes(), total_content);
885
886        let mut cur = 0;
887        while buf.is_empty() {
888            let cnt = rng.gen_range(0..total_size - cur);
889            cur += cnt;
890            buf.advance(cnt);
891
892            // Before truncate
893            assert_eq!(buf.remaining(), total_size - cur);
894            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
895
896            let truncate_size = rng.gen_range(0..total_size - cur);
897            buf.truncate(truncate_size);
898
899            // After truncate
900            assert_eq!(buf.remaining(), truncate_size);
901            assert_eq!(
902                buf.to_bytes(),
903                total_content.slice(cur..cur + truncate_size)
904            );
905
906            // Try next after truncate
907            if let Some(bs) = buf.next() {
908                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
909                cur += bs.len();
910            }
911
912            // After next
913            assert_eq!(buf.remaining(), total_size - cur);
914            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
915        }
916    }
917
918    #[test]
919    fn test_read_trait() {
920        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
921        let mut output = vec![0; 5];
922        let size = buffer.read(&mut output).unwrap();
923        assert_eq!(size, 5);
924        assert_eq!(&output, b"Hello");
925    }
926
927    #[test]
928    fn test_seek_trait() {
929        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
930        buffer.seek(SeekFrom::Start(5)).unwrap();
931        let mut output = vec![0; 5];
932        buffer.read_exact(&mut output).unwrap();
933        assert_eq!(&output, b"World");
934    }
935
936    #[test]
937    fn test_bufread_trait() {
938        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
939        let mut output = String::new();
940        buffer.read_to_string(&mut output).unwrap();
941        assert_eq!(output, "HelloWorld");
942
943        let mut buffer = Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]);
944        let buf = buffer.fill_buf().unwrap();
945        assert_eq!(buf, b"Hello");
946        buffer.consume(5);
947        let buf = buffer.fill_buf().unwrap();
948        assert_eq!(buf, b"World");
949    }
950
951    #[test]
952    fn test_read_partial() {
953        let mut buffer = Buffer::from(vec![Bytes::from("Partial"), Bytes::from("Read")]);
954        let mut output = vec![0; 4];
955        let size = buffer.read(&mut output).unwrap();
956        assert_eq!(size, 4);
957        assert_eq!(&output, b"Part");
958
959        let size = buffer.read(&mut output).unwrap();
960        assert_eq!(size, 3);
961        assert_eq!(&output[..3], b"ial");
962    }
963
964    #[test]
965    fn test_seek_and_read() {
966        let mut buffer = Buffer::from(vec![Bytes::from("SeekAndRead")]);
967        buffer.seek(SeekFrom::Start(4)).unwrap();
968        let mut output = vec![0; 3];
969        buffer.read_exact(&mut output).unwrap();
970        assert_eq!(&output, b"And");
971    }
972
973    #[test]
974    fn test_bufread_consume() {
975        let mut buffer = Buffer::from(vec![Bytes::from("ConsumeTest")]);
976        let buf = buffer.fill_buf().unwrap();
977        assert_eq!(buf, b"ConsumeTest");
978        buffer.consume(7);
979        let buf = buffer.fill_buf().unwrap();
980        assert_eq!(buf, b"Test");
981    }
982
983    #[test]
984    fn test_empty_buffer() {
985        let mut buffer = Buffer::new();
986        let mut output = vec![0; 5];
987        let size = buffer.read(&mut output).unwrap();
988        assert_eq!(size, 0);
989        assert_eq!(&output, &[0; 5]);
990    }
991
992    #[test]
993    fn test_seek_out_of_bounds() {
994        let mut buffer = Buffer::from(vec![Bytes::from("OutOfBounds")]);
995        let result = buffer.seek(SeekFrom::Start(100));
996        assert!(result.is_err());
997    }
998}