1use std::collections::VecDeque;
19use std::convert::Infallible;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::io::IoSlice;
23use std::mem;
24use std::ops::Bound;
25use std::ops::RangeBounds;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::Context;
29use std::task::Poll;
30
31use bytes::Buf;
32use bytes::BufMut;
33use bytes::Bytes;
34use bytes::BytesMut;
35use futures::Stream;
36
37use crate::*;
38
39#[derive(Clone)]
112pub struct Buffer(Inner);
113
114#[derive(Clone)]
115enum Inner {
116 Contiguous(Bytes),
117 NonContiguous {
118 parts: Arc<[Bytes]>,
119 size: usize,
120 idx: usize,
121 offset: usize,
122 },
123}
124
125impl Debug for Buffer {
126 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127 let mut b = f.debug_struct("Buffer");
128
129 match &self.0 {
130 Inner::Contiguous(bs) => {
131 b.field("type", &"contiguous");
132 b.field("size", &bs.len());
133 }
134 Inner::NonContiguous {
135 parts,
136 size,
137 idx,
138 offset,
139 } => {
140 b.field("type", &"non_contiguous");
141 b.field("parts", &parts);
142 b.field("size", &size);
143 b.field("idx", &idx);
144 b.field("offset", &offset);
145 }
146 }
147 b.finish_non_exhaustive()
148 }
149}
150
151impl Default for Buffer {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156impl Buffer {
157 #[inline]
161 pub const fn new() -> Self {
162 Self(Inner::Contiguous(Bytes::new()))
163 }
164
165 #[inline]
167 pub fn len(&self) -> usize {
168 match &self.0 {
169 Inner::Contiguous(b) => b.remaining(),
170 Inner::NonContiguous { size, .. } => *size,
171 }
172 }
173
174 #[inline]
176 pub fn is_empty(&self) -> bool {
177 self.len() == 0
178 }
179
180 pub fn count(&self) -> usize {
185 match &self.0 {
186 Inner::Contiguous(_) => 1,
187 Inner::NonContiguous {
188 parts,
189 idx,
190 size,
191 offset,
192 } => {
193 parts
194 .iter()
195 .skip(*idx)
196 .fold((0, size + offset), |(count, size), bytes| {
197 if size == 0 {
198 (count, 0)
199 } else {
200 (count + 1, size.saturating_sub(bytes.len()))
201 }
202 })
203 .0
204 }
205 }
206 }
207
208 pub fn current(&self) -> Bytes {
210 match &self.0 {
211 Inner::Contiguous(inner) => inner.clone(),
212 Inner::NonContiguous {
213 parts,
214 idx,
215 offset,
216 size,
217 } => {
218 let chunk = &parts[*idx];
219 let n = (chunk.len() - *offset).min(*size);
220 chunk.slice(*offset..*offset + n)
221 }
222 }
223 }
224
225 #[inline]
229 pub fn truncate(&mut self, len: usize) {
230 match &mut self.0 {
231 Inner::Contiguous(bs) => bs.truncate(len),
232 Inner::NonContiguous { size, .. } => {
233 *size = (*size).min(len);
234 }
235 }
236 }
237
238 pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
244 let len = self.len();
245
246 let begin = match range.start_bound() {
247 Bound::Included(&n) => n,
248 Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
249 Bound::Unbounded => 0,
250 };
251
252 let end = match range.end_bound() {
253 Bound::Included(&n) => n.checked_add(1).expect("out of range"),
254 Bound::Excluded(&n) => n,
255 Bound::Unbounded => len,
256 };
257
258 assert!(
259 begin <= end,
260 "range start must not be greater than end: {:?} <= {:?}",
261 begin,
262 end,
263 );
264 assert!(
265 end <= len,
266 "range end out of bounds: {:?} <= {:?}",
267 end,
268 len,
269 );
270
271 if end == begin {
272 return Buffer::new();
273 }
274
275 let mut ret = self.clone();
276 ret.truncate(end);
277 ret.advance(begin);
278 ret
279 }
280
281 #[inline]
287 pub fn to_bytes(&self) -> Bytes {
288 match &self.0 {
289 Inner::Contiguous(bytes) => bytes.clone(),
290 Inner::NonContiguous {
291 parts,
292 size,
293 idx: _,
294 offset,
295 } => {
296 if parts.len() == 1 {
297 parts[0].slice(*offset..(*offset + *size))
298 } else {
299 let mut ret = BytesMut::with_capacity(self.len());
300 ret.put(self.clone());
301 ret.freeze()
302 }
303 }
304 }
305 }
306
307 #[inline]
312 pub fn to_vec(&self) -> Vec<u8> {
313 let mut ret = Vec::with_capacity(self.len());
314 ret.put(self.clone());
315 ret
316 }
317
318 #[inline]
320 pub fn to_io_slice(&self) -> Vec<IoSlice<'_>> {
321 match &self.0 {
322 Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())],
323 Inner::NonContiguous {
324 parts, idx, offset, ..
325 } => {
326 let mut ret = Vec::with_capacity(parts.len() - *idx);
327 let mut new_offset = *offset;
328 for part in parts.iter().skip(*idx) {
329 ret.push(IoSlice::new(&part[new_offset..]));
330 new_offset = 0;
331 }
332 ret
333 }
334 }
335 }
336}
337
338impl From<Vec<u8>> for Buffer {
339 #[inline]
340 fn from(bs: Vec<u8>) -> Self {
341 Self(Inner::Contiguous(bs.into()))
342 }
343}
344
345impl From<Bytes> for Buffer {
346 #[inline]
347 fn from(bs: Bytes) -> Self {
348 Self(Inner::Contiguous(bs))
349 }
350}
351
352impl From<String> for Buffer {
353 #[inline]
354 fn from(s: String) -> Self {
355 Self(Inner::Contiguous(Bytes::from(s)))
356 }
357}
358
359impl From<&'static [u8]> for Buffer {
360 #[inline]
361 fn from(s: &'static [u8]) -> Self {
362 Self(Inner::Contiguous(Bytes::from_static(s)))
363 }
364}
365
366impl From<&'static str> for Buffer {
367 #[inline]
368 fn from(s: &'static str) -> Self {
369 Self(Inner::Contiguous(Bytes::from_static(s.as_bytes())))
370 }
371}
372
373impl FromIterator<u8> for Buffer {
374 #[inline]
375 fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
376 Self(Inner::Contiguous(Bytes::from_iter(iter)))
377 }
378}
379
380impl From<VecDeque<Bytes>> for Buffer {
381 #[inline]
382 fn from(bs: VecDeque<Bytes>) -> Self {
383 let size = bs.iter().map(Bytes::len).sum();
384 Self(Inner::NonContiguous {
385 parts: Vec::from(bs).into(),
386 size,
387 idx: 0,
388 offset: 0,
389 })
390 }
391}
392
393impl From<Vec<Bytes>> for Buffer {
394 #[inline]
395 fn from(bs: Vec<Bytes>) -> Self {
396 let size = bs.iter().map(Bytes::len).sum();
397 Self(Inner::NonContiguous {
398 parts: bs.into(),
399 size,
400 idx: 0,
401 offset: 0,
402 })
403 }
404}
405
406impl From<Arc<[Bytes]>> for Buffer {
407 #[inline]
408 fn from(bs: Arc<[Bytes]>) -> Self {
409 let size = bs.iter().map(Bytes::len).sum();
410 Self(Inner::NonContiguous {
411 parts: bs,
412 size,
413 idx: 0,
414 offset: 0,
415 })
416 }
417}
418
419impl FromIterator<Bytes> for Buffer {
420 #[inline]
421 fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
422 let mut size = 0;
423 let bs = iter.into_iter().inspect(|v| size += v.len());
424 let parts = Arc::from_iter(bs);
427 Self(Inner::NonContiguous {
428 parts,
429 size,
430 idx: 0,
431 offset: 0,
432 })
433 }
434}
435
436impl Buf for Buffer {
437 #[inline]
438 fn remaining(&self) -> usize {
439 self.len()
440 }
441
442 #[inline]
443 fn chunk(&self) -> &[u8] {
444 match &self.0 {
445 Inner::Contiguous(b) => b.chunk(),
446 Inner::NonContiguous {
447 parts,
448 size,
449 idx,
450 offset,
451 } => {
452 if *size == 0 {
453 return &[];
454 }
455
456 let chunk = &parts[*idx];
457 let n = (chunk.len() - *offset).min(*size);
458 &parts[*idx][*offset..*offset + n]
459 }
460 }
461 }
462
463 #[inline]
464 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
465 match &self.0 {
466 Inner::Contiguous(b) => {
467 if dst.is_empty() {
468 return 0;
469 }
470
471 dst[0] = IoSlice::new(b.chunk());
472 1
473 }
474 Inner::NonContiguous {
475 parts, idx, offset, ..
476 } => {
477 if dst.is_empty() {
478 return 0;
479 }
480
481 let mut new_offset = *offset;
482 parts
483 .iter()
484 .skip(*idx)
485 .zip(dst.iter_mut())
486 .map(|(part, dst)| {
487 *dst = IoSlice::new(&part[new_offset..]);
488 new_offset = 0;
489 })
490 .count()
491 }
492 }
493 }
494
495 #[inline]
496 fn advance(&mut self, cnt: usize) {
497 match &mut self.0 {
498 Inner::Contiguous(b) => b.advance(cnt),
499 Inner::NonContiguous {
500 parts,
501 size,
502 idx,
503 offset,
504 } => {
505 assert!(
506 cnt <= *size,
507 "cannot advance past {cnt} bytes, only {size} bytes left"
508 );
509
510 let mut new_idx = *idx;
511 let mut new_offset = *offset;
512 let mut remaining_cnt = cnt;
513 while remaining_cnt > 0 {
514 let part_len = parts[new_idx].len();
515 let remaining_in_part = part_len - new_offset;
516
517 if remaining_cnt < remaining_in_part {
518 new_offset += remaining_cnt;
519 break;
520 }
521
522 remaining_cnt -= remaining_in_part;
523 new_idx += 1;
524 new_offset = 0;
525 }
526
527 *idx = new_idx;
528 *offset = new_offset;
529 *size -= cnt;
530 }
531 }
532 }
533}
534
535impl Iterator for Buffer {
536 type Item = Bytes;
537
538 fn next(&mut self) -> Option<Self::Item> {
539 match &mut self.0 {
540 Inner::Contiguous(bs) => {
541 if bs.is_empty() {
542 None
543 } else {
544 Some(mem::take(bs))
545 }
546 }
547 Inner::NonContiguous {
548 parts,
549 size,
550 idx,
551 offset,
552 } => {
553 if *size == 0 {
554 return None;
555 }
556
557 let chunk = &parts[*idx];
558 let n = (chunk.len() - *offset).min(*size);
559 let buf = chunk.slice(*offset..*offset + n);
560 *size -= n;
561 *offset += n;
562
563 if *offset == chunk.len() {
564 *idx += 1;
565 *offset = 0;
566 }
567
568 Some(buf)
569 }
570 }
571 }
572
573 fn size_hint(&self) -> (usize, Option<usize>) {
574 match &self.0 {
575 Inner::Contiguous(bs) => {
576 if bs.is_empty() {
577 (0, Some(0))
578 } else {
579 (1, Some(1))
580 }
581 }
582 Inner::NonContiguous { parts, idx, .. } => {
583 let remaining = parts.len().saturating_sub(*idx);
584 (remaining, Some(remaining))
585 }
586 }
587 }
588}
589
590impl Stream for Buffer {
591 type Item = Result<Bytes, Infallible>;
592
593 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
594 Poll::Ready(self.get_mut().next().map(Ok))
595 }
596
597 fn size_hint(&self) -> (usize, Option<usize>) {
598 Iterator::size_hint(self)
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use pretty_assertions::assert_eq;
605 use rand::prelude::*;
606
607 use super::*;
608
609 const EMPTY_SLICE: &[u8] = &[];
610
611 #[test]
612 fn test_contiguous_buffer() {
613 let mut buf = Buffer::new();
614
615 assert_eq!(buf.remaining(), 0);
616 assert_eq!(buf.chunk(), EMPTY_SLICE);
617 assert_eq!(buf.next(), None);
618 }
619
620 #[test]
621 fn test_empty_non_contiguous_buffer() {
622 let mut buf = Buffer::from(vec![Bytes::new()]);
623
624 assert_eq!(buf.remaining(), 0);
625 assert_eq!(buf.chunk(), EMPTY_SLICE);
626 assert_eq!(buf.next(), None);
627 }
628
629 #[test]
630 fn test_non_contiguous_buffer_with_empty_chunks() {
631 let mut buf = Buffer::from(vec![Bytes::from("a")]);
632
633 assert_eq!(buf.remaining(), 1);
634 assert_eq!(buf.chunk(), b"a");
635
636 buf.advance(1);
637
638 assert_eq!(buf.remaining(), 0);
639 assert_eq!(buf.chunk(), EMPTY_SLICE);
640 }
641
642 #[test]
643 fn test_non_contiguous_buffer_with_next() {
644 let mut buf = Buffer::from(vec![Bytes::from("a")]);
645
646 assert_eq!(buf.remaining(), 1);
647 assert_eq!(buf.chunk(), b"a");
648
649 let bs = buf.next();
650
651 assert_eq!(bs, Some(Bytes::from("a")));
652 assert_eq!(buf.remaining(), 0);
653 assert_eq!(buf.chunk(), EMPTY_SLICE);
654 }
655
656 #[test]
657 fn test_buffer_advance() {
658 let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
659
660 assert_eq!(buf.remaining(), 3);
661 assert_eq!(buf.chunk(), b"a");
662
663 buf.advance(1);
664
665 assert_eq!(buf.remaining(), 2);
666 assert_eq!(buf.chunk(), b"b");
667
668 buf.advance(1);
669
670 assert_eq!(buf.remaining(), 1);
671 assert_eq!(buf.chunk(), b"c");
672
673 buf.advance(1);
674
675 assert_eq!(buf.remaining(), 0);
676 assert_eq!(buf.chunk(), EMPTY_SLICE);
677
678 buf.advance(0);
679
680 assert_eq!(buf.remaining(), 0);
681 assert_eq!(buf.chunk(), EMPTY_SLICE);
682 }
683
684 #[test]
685 fn test_buffer_truncate() {
686 let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
687
688 assert_eq!(buf.remaining(), 3);
689 assert_eq!(buf.chunk(), b"a");
690
691 buf.truncate(100);
692
693 assert_eq!(buf.remaining(), 3);
694 assert_eq!(buf.chunk(), b"a");
695
696 buf.truncate(2);
697
698 assert_eq!(buf.remaining(), 2);
699 assert_eq!(buf.chunk(), b"a");
700
701 buf.truncate(0);
702
703 assert_eq!(buf.remaining(), 0);
704 assert_eq!(buf.chunk(), EMPTY_SLICE);
705 }
706
707 fn setup_buffer() -> (Buffer, usize, Bytes) {
713 let mut rng = thread_rng();
714
715 let bs = (0..100)
716 .map(|_| {
717 let len = rng.gen_range(1..100);
718 let mut buf = vec![0; len];
719 rng.fill(&mut buf[..]);
720 Bytes::from(buf)
721 })
722 .collect::<Vec<_>>();
723
724 let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
725 let total_content = bs.iter().flatten().copied().collect::<Bytes>();
726 let buf = Buffer::from(bs);
727
728 (buf, total_size, total_content)
729 }
730
731 #[test]
732 fn fuzz_buffer_advance() {
733 let mut rng = thread_rng();
734
735 let (mut buf, total_size, total_content) = setup_buffer();
736 assert_eq!(buf.remaining(), total_size);
737 assert_eq!(buf.to_bytes(), total_content);
738
739 let mut cur = 0;
740 let mut times = 10000;
742 while !buf.is_empty() && times > 0 {
743 times -= 1;
744
745 let cnt = rng.gen_range(0..total_size - cur);
746 cur += cnt;
747 buf.advance(cnt);
748
749 assert_eq!(buf.remaining(), total_size - cur);
750 assert_eq!(buf.to_bytes(), total_content.slice(cur..));
751 }
752 }
753
754 #[test]
755 fn fuzz_buffer_iter() {
756 let mut rng = thread_rng();
757
758 let (mut buf, total_size, total_content) = setup_buffer();
759 assert_eq!(buf.remaining(), total_size);
760 assert_eq!(buf.to_bytes(), total_content);
761
762 let mut cur = 0;
763 while buf.is_empty() {
764 let cnt = rng.gen_range(0..total_size - cur);
765 cur += cnt;
766 buf.advance(cnt);
767
768 assert_eq!(buf.remaining(), total_size - cur);
770 assert_eq!(buf.to_bytes(), total_content.slice(cur..));
771
772 if let Some(bs) = buf.next() {
773 assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
774 cur += bs.len();
775 }
776
777 assert_eq!(buf.remaining(), total_size - cur);
779 assert_eq!(buf.to_bytes(), total_content.slice(cur..));
780 }
781 }
782
783 #[test]
784 fn fuzz_buffer_truncate() {
785 let mut rng = thread_rng();
786
787 let (mut buf, total_size, total_content) = setup_buffer();
788 assert_eq!(buf.remaining(), total_size);
789 assert_eq!(buf.to_bytes(), total_content);
790
791 let mut cur = 0;
792 while buf.is_empty() {
793 let cnt = rng.gen_range(0..total_size - cur);
794 cur += cnt;
795 buf.advance(cnt);
796
797 assert_eq!(buf.remaining(), total_size - cur);
799 assert_eq!(buf.to_bytes(), total_content.slice(cur..));
800
801 let truncate_size = rng.gen_range(0..total_size - cur);
802 buf.truncate(truncate_size);
803
804 assert_eq!(buf.remaining(), truncate_size);
806 assert_eq!(
807 buf.to_bytes(),
808 total_content.slice(cur..cur + truncate_size)
809 );
810
811 if let Some(bs) = buf.next() {
813 assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
814 cur += bs.len();
815 }
816
817 assert_eq!(buf.remaining(), total_size - cur);
819 assert_eq!(buf.to_bytes(), total_content.slice(cur..));
820 }
821 }
822}