opendal/types/
buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::convert::Infallible;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::io::IoSlice;
23use std::mem;
24use std::ops::Bound;
25use std::ops::RangeBounds;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::Context;
29use std::task::Poll;
30
31use bytes::Buf;
32use bytes::BufMut;
33use bytes::Bytes;
34use bytes::BytesMut;
35use futures::Stream;
36
37use crate::*;
38
39/// Buffer is a wrapper of contiguous `Bytes` and non-contiguous `[Bytes]`.
40///
41/// We designed buffer to allow underlying storage to return non-contiguous bytes. For example,
42/// http based storage like s3 could generate non-contiguous bytes by stream.
43///
44/// ## Features
45///
46/// - [`Buffer`] can be used as [`Buf`], [`Iterator`], [`Stream`] directly.
47/// - [`Buffer`] is cheap to clone like [`Bytes`], only update reference count, no allocation.
48/// - [`Buffer`] is vectorized write friendly, you can convert it to [`IoSlice`] for vectored write.
49///
50/// ## Examples
51///
52/// ### As `Buf`
53///
54/// `Buffer` implements `Buf` trait:
55///
56/// ```rust
57/// use bytes::Buf;
58/// use opendal::Buffer;
59/// use serde_json;
60///
61/// fn test(mut buf: Buffer) -> Vec<String> {
62///     serde_json::from_reader(buf.reader()).unwrap()
63/// }
64/// ```
65///
66/// ### As Bytes `Iterator`
67///
68/// `Buffer` implements `Iterator<Item=Bytes>` trait:
69///
70/// ```rust
71/// use bytes::Bytes;
72/// use opendal::Buffer;
73///
74/// fn test(mut buf: Buffer) -> Vec<Bytes> {
75///     buf.into_iter().collect()
76/// }
77/// ```
78///
79/// ### As Bytes `Stream`
80///
81/// `Buffer` implements `Stream<Item=Result<Bytes, Infallible>>` trait:
82///
83/// ```rust
84/// use bytes::Bytes;
85/// use futures::TryStreamExt;
86/// use opendal::Buffer;
87///
88/// async fn test(mut buf: Buffer) -> Vec<Bytes> {
89///     buf.into_iter().try_collect().await.unwrap()
90/// }
91/// ```
92///
93/// ### As one contiguous Bytes
94///
95/// `Buffer` can make contiguous by transform into `Bytes` or `Vec<u8>`.
96/// Please keep in mind that this operation involves new allocation and bytes copy, and we can't
97/// reuse the same memory region anymore.
98///
99/// ```rust
100/// use bytes::Bytes;
101/// use opendal::Buffer;
102///
103/// fn test_to_vec(buf: Buffer) -> Vec<u8> {
104///     buf.to_vec()
105/// }
106///
107/// fn test_to_bytes(buf: Buffer) -> Bytes {
108///     buf.to_bytes()
109/// }
110/// ```
111#[derive(Clone)]
112pub struct Buffer(Inner);
113
114#[derive(Clone)]
115enum Inner {
116    Contiguous(Bytes),
117    NonContiguous {
118        parts: Arc<[Bytes]>,
119        size: usize,
120        idx: usize,
121        offset: usize,
122    },
123}
124
125impl Debug for Buffer {
126    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127        let mut b = f.debug_struct("Buffer");
128
129        match &self.0 {
130            Inner::Contiguous(bs) => {
131                b.field("type", &"contiguous");
132                b.field("size", &bs.len());
133            }
134            Inner::NonContiguous {
135                parts,
136                size,
137                idx,
138                offset,
139            } => {
140                b.field("type", &"non_contiguous");
141                b.field("parts", &parts);
142                b.field("size", &size);
143                b.field("idx", &idx);
144                b.field("offset", &offset);
145            }
146        }
147        b.finish_non_exhaustive()
148    }
149}
150
151impl Default for Buffer {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156impl Buffer {
157    /// Create a new empty buffer.
158    ///
159    /// This operation is const and no allocation will be performed.
160    #[inline]
161    pub const fn new() -> Self {
162        Self(Inner::Contiguous(Bytes::new()))
163    }
164
165    /// Get the length of the buffer.
166    #[inline]
167    pub fn len(&self) -> usize {
168        match &self.0 {
169            Inner::Contiguous(b) => b.remaining(),
170            Inner::NonContiguous { size, .. } => *size,
171        }
172    }
173
174    /// Check if buffer is empty.
175    #[inline]
176    pub fn is_empty(&self) -> bool {
177        self.len() == 0
178    }
179
180    /// Number of [`Bytes`] in [`Buffer`].
181    ///
182    /// For contiguous buffer, it's always 1. For non-contiguous buffer, it's number of bytes
183    /// available for use.
184    pub fn count(&self) -> usize {
185        match &self.0 {
186            Inner::Contiguous(_) => 1,
187            Inner::NonContiguous {
188                parts,
189                idx,
190                size,
191                offset,
192            } => {
193                parts
194                    .iter()
195                    .skip(*idx)
196                    .fold((0, size + offset), |(count, size), bytes| {
197                        if size == 0 {
198                            (count, 0)
199                        } else {
200                            (count + 1, size.saturating_sub(bytes.len()))
201                        }
202                    })
203                    .0
204            }
205        }
206    }
207
208    /// Get current [`Bytes`].
209    pub fn current(&self) -> Bytes {
210        match &self.0 {
211            Inner::Contiguous(inner) => inner.clone(),
212            Inner::NonContiguous {
213                parts,
214                idx,
215                offset,
216                size,
217            } => {
218                let chunk = &parts[*idx];
219                let n = (chunk.len() - *offset).min(*size);
220                chunk.slice(*offset..*offset + n)
221            }
222        }
223    }
224
225    /// Shortens the buffer, keeping the first `len` bytes and dropping the rest.
226    ///
227    /// If `len` is greater than the buffer’s current length, this has no effect.
228    #[inline]
229    pub fn truncate(&mut self, len: usize) {
230        match &mut self.0 {
231            Inner::Contiguous(bs) => bs.truncate(len),
232            Inner::NonContiguous { size, .. } => {
233                *size = (*size).min(len);
234            }
235        }
236    }
237
238    /// Returns a slice of self for the provided range.
239    ///
240    /// This will increment the reference count for the underlying memory and return a new Buffer handle set to the slice.
241    ///
242    /// This operation is O(1).
243    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
244        let len = self.len();
245
246        let begin = match range.start_bound() {
247            Bound::Included(&n) => n,
248            Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
249            Bound::Unbounded => 0,
250        };
251
252        let end = match range.end_bound() {
253            Bound::Included(&n) => n.checked_add(1).expect("out of range"),
254            Bound::Excluded(&n) => n,
255            Bound::Unbounded => len,
256        };
257
258        assert!(
259            begin <= end,
260            "range start must not be greater than end: {:?} <= {:?}",
261            begin,
262            end,
263        );
264        assert!(
265            end <= len,
266            "range end out of bounds: {:?} <= {:?}",
267            end,
268            len,
269        );
270
271        if end == begin {
272            return Buffer::new();
273        }
274
275        let mut ret = self.clone();
276        ret.truncate(end);
277        ret.advance(begin);
278        ret
279    }
280
281    /// Combine all bytes together into one single [`Bytes`].
282    ///
283    /// This operation is zero copy if the underlying bytes are contiguous.
284    /// Otherwise, it will copy all bytes into one single [`Bytes`].
285    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
286    #[inline]
287    pub fn to_bytes(&self) -> Bytes {
288        match &self.0 {
289            Inner::Contiguous(bytes) => bytes.clone(),
290            Inner::NonContiguous {
291                parts,
292                size,
293                idx: _,
294                offset,
295            } => {
296                if parts.len() == 1 {
297                    parts[0].slice(*offset..(*offset + *size))
298                } else {
299                    let mut ret = BytesMut::with_capacity(self.len());
300                    ret.put(self.clone());
301                    ret.freeze()
302                }
303            }
304        }
305    }
306
307    /// Combine all bytes together into one single [`Vec<u8>`].
308    ///
309    /// This operation is not zero copy, it will copy all bytes into one single [`Vec<u8>`].
310    /// Please use API from [`Buf`], [`Iterator`] or [`Stream`] whenever possible.
311    #[inline]
312    pub fn to_vec(&self) -> Vec<u8> {
313        let mut ret = Vec::with_capacity(self.len());
314        ret.put(self.clone());
315        ret
316    }
317
318    /// Convert buffer into a slice of [`IoSlice`] for vectored write.
319    #[inline]
320    pub fn to_io_slice(&self) -> Vec<IoSlice<'_>> {
321        match &self.0 {
322            Inner::Contiguous(bs) => vec![IoSlice::new(bs.chunk())],
323            Inner::NonContiguous {
324                parts, idx, offset, ..
325            } => {
326                let mut ret = Vec::with_capacity(parts.len() - *idx);
327                let mut new_offset = *offset;
328                for part in parts.iter().skip(*idx) {
329                    ret.push(IoSlice::new(&part[new_offset..]));
330                    new_offset = 0;
331                }
332                ret
333            }
334        }
335    }
336}
337
338impl From<Vec<u8>> for Buffer {
339    #[inline]
340    fn from(bs: Vec<u8>) -> Self {
341        Self(Inner::Contiguous(bs.into()))
342    }
343}
344
345impl From<Bytes> for Buffer {
346    #[inline]
347    fn from(bs: Bytes) -> Self {
348        Self(Inner::Contiguous(bs))
349    }
350}
351
352impl From<String> for Buffer {
353    #[inline]
354    fn from(s: String) -> Self {
355        Self(Inner::Contiguous(Bytes::from(s)))
356    }
357}
358
359impl From<&'static [u8]> for Buffer {
360    #[inline]
361    fn from(s: &'static [u8]) -> Self {
362        Self(Inner::Contiguous(Bytes::from_static(s)))
363    }
364}
365
366impl From<&'static str> for Buffer {
367    #[inline]
368    fn from(s: &'static str) -> Self {
369        Self(Inner::Contiguous(Bytes::from_static(s.as_bytes())))
370    }
371}
372
373impl FromIterator<u8> for Buffer {
374    #[inline]
375    fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
376        Self(Inner::Contiguous(Bytes::from_iter(iter)))
377    }
378}
379
380impl From<VecDeque<Bytes>> for Buffer {
381    #[inline]
382    fn from(bs: VecDeque<Bytes>) -> Self {
383        let size = bs.iter().map(Bytes::len).sum();
384        Self(Inner::NonContiguous {
385            parts: Vec::from(bs).into(),
386            size,
387            idx: 0,
388            offset: 0,
389        })
390    }
391}
392
393impl From<Vec<Bytes>> for Buffer {
394    #[inline]
395    fn from(bs: Vec<Bytes>) -> Self {
396        let size = bs.iter().map(Bytes::len).sum();
397        Self(Inner::NonContiguous {
398            parts: bs.into(),
399            size,
400            idx: 0,
401            offset: 0,
402        })
403    }
404}
405
406impl From<Arc<[Bytes]>> for Buffer {
407    #[inline]
408    fn from(bs: Arc<[Bytes]>) -> Self {
409        let size = bs.iter().map(Bytes::len).sum();
410        Self(Inner::NonContiguous {
411            parts: bs,
412            size,
413            idx: 0,
414            offset: 0,
415        })
416    }
417}
418
419impl FromIterator<Bytes> for Buffer {
420    #[inline]
421    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
422        let mut size = 0;
423        let bs = iter.into_iter().inspect(|v| size += v.len());
424        // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead
425        // of iterator -> `Vec<Bytes>` -> `Arc<[Bytes]>`.
426        let parts = Arc::from_iter(bs);
427        Self(Inner::NonContiguous {
428            parts,
429            size,
430            idx: 0,
431            offset: 0,
432        })
433    }
434}
435
436impl Buf for Buffer {
437    #[inline]
438    fn remaining(&self) -> usize {
439        self.len()
440    }
441
442    #[inline]
443    fn chunk(&self) -> &[u8] {
444        match &self.0 {
445            Inner::Contiguous(b) => b.chunk(),
446            Inner::NonContiguous {
447                parts,
448                size,
449                idx,
450                offset,
451            } => {
452                if *size == 0 {
453                    return &[];
454                }
455
456                let chunk = &parts[*idx];
457                let n = (chunk.len() - *offset).min(*size);
458                &parts[*idx][*offset..*offset + n]
459            }
460        }
461    }
462
463    #[inline]
464    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
465        match &self.0 {
466            Inner::Contiguous(b) => {
467                if dst.is_empty() {
468                    return 0;
469                }
470
471                dst[0] = IoSlice::new(b.chunk());
472                1
473            }
474            Inner::NonContiguous {
475                parts, idx, offset, ..
476            } => {
477                if dst.is_empty() {
478                    return 0;
479                }
480
481                let mut new_offset = *offset;
482                parts
483                    .iter()
484                    .skip(*idx)
485                    .zip(dst.iter_mut())
486                    .map(|(part, dst)| {
487                        *dst = IoSlice::new(&part[new_offset..]);
488                        new_offset = 0;
489                    })
490                    .count()
491            }
492        }
493    }
494
495    #[inline]
496    fn advance(&mut self, cnt: usize) {
497        match &mut self.0 {
498            Inner::Contiguous(b) => b.advance(cnt),
499            Inner::NonContiguous {
500                parts,
501                size,
502                idx,
503                offset,
504            } => {
505                assert!(
506                    cnt <= *size,
507                    "cannot advance past {cnt} bytes, only {size} bytes left"
508                );
509
510                let mut new_idx = *idx;
511                let mut new_offset = *offset;
512                let mut remaining_cnt = cnt;
513                while remaining_cnt > 0 {
514                    let part_len = parts[new_idx].len();
515                    let remaining_in_part = part_len - new_offset;
516
517                    if remaining_cnt < remaining_in_part {
518                        new_offset += remaining_cnt;
519                        break;
520                    }
521
522                    remaining_cnt -= remaining_in_part;
523                    new_idx += 1;
524                    new_offset = 0;
525                }
526
527                *idx = new_idx;
528                *offset = new_offset;
529                *size -= cnt;
530            }
531        }
532    }
533}
534
535impl Iterator for Buffer {
536    type Item = Bytes;
537
538    fn next(&mut self) -> Option<Self::Item> {
539        match &mut self.0 {
540            Inner::Contiguous(bs) => {
541                if bs.is_empty() {
542                    None
543                } else {
544                    Some(mem::take(bs))
545                }
546            }
547            Inner::NonContiguous {
548                parts,
549                size,
550                idx,
551                offset,
552            } => {
553                if *size == 0 {
554                    return None;
555                }
556
557                let chunk = &parts[*idx];
558                let n = (chunk.len() - *offset).min(*size);
559                let buf = chunk.slice(*offset..*offset + n);
560                *size -= n;
561                *offset += n;
562
563                if *offset == chunk.len() {
564                    *idx += 1;
565                    *offset = 0;
566                }
567
568                Some(buf)
569            }
570        }
571    }
572
573    fn size_hint(&self) -> (usize, Option<usize>) {
574        match &self.0 {
575            Inner::Contiguous(bs) => {
576                if bs.is_empty() {
577                    (0, Some(0))
578                } else {
579                    (1, Some(1))
580                }
581            }
582            Inner::NonContiguous { parts, idx, .. } => {
583                let remaining = parts.len().saturating_sub(*idx);
584                (remaining, Some(remaining))
585            }
586        }
587    }
588}
589
590impl Stream for Buffer {
591    type Item = Result<Bytes, Infallible>;
592
593    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
594        Poll::Ready(self.get_mut().next().map(Ok))
595    }
596
597    fn size_hint(&self) -> (usize, Option<usize>) {
598        Iterator::size_hint(self)
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use pretty_assertions::assert_eq;
605    use rand::prelude::*;
606
607    use super::*;
608
609    const EMPTY_SLICE: &[u8] = &[];
610
611    #[test]
612    fn test_contiguous_buffer() {
613        let mut buf = Buffer::new();
614
615        assert_eq!(buf.remaining(), 0);
616        assert_eq!(buf.chunk(), EMPTY_SLICE);
617        assert_eq!(buf.next(), None);
618    }
619
620    #[test]
621    fn test_empty_non_contiguous_buffer() {
622        let mut buf = Buffer::from(vec![Bytes::new()]);
623
624        assert_eq!(buf.remaining(), 0);
625        assert_eq!(buf.chunk(), EMPTY_SLICE);
626        assert_eq!(buf.next(), None);
627    }
628
629    #[test]
630    fn test_non_contiguous_buffer_with_empty_chunks() {
631        let mut buf = Buffer::from(vec![Bytes::from("a")]);
632
633        assert_eq!(buf.remaining(), 1);
634        assert_eq!(buf.chunk(), b"a");
635
636        buf.advance(1);
637
638        assert_eq!(buf.remaining(), 0);
639        assert_eq!(buf.chunk(), EMPTY_SLICE);
640    }
641
642    #[test]
643    fn test_non_contiguous_buffer_with_next() {
644        let mut buf = Buffer::from(vec![Bytes::from("a")]);
645
646        assert_eq!(buf.remaining(), 1);
647        assert_eq!(buf.chunk(), b"a");
648
649        let bs = buf.next();
650
651        assert_eq!(bs, Some(Bytes::from("a")));
652        assert_eq!(buf.remaining(), 0);
653        assert_eq!(buf.chunk(), EMPTY_SLICE);
654    }
655
656    #[test]
657    fn test_buffer_advance() {
658        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
659
660        assert_eq!(buf.remaining(), 3);
661        assert_eq!(buf.chunk(), b"a");
662
663        buf.advance(1);
664
665        assert_eq!(buf.remaining(), 2);
666        assert_eq!(buf.chunk(), b"b");
667
668        buf.advance(1);
669
670        assert_eq!(buf.remaining(), 1);
671        assert_eq!(buf.chunk(), b"c");
672
673        buf.advance(1);
674
675        assert_eq!(buf.remaining(), 0);
676        assert_eq!(buf.chunk(), EMPTY_SLICE);
677
678        buf.advance(0);
679
680        assert_eq!(buf.remaining(), 0);
681        assert_eq!(buf.chunk(), EMPTY_SLICE);
682    }
683
684    #[test]
685    fn test_buffer_truncate() {
686        let mut buf = Buffer::from(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]);
687
688        assert_eq!(buf.remaining(), 3);
689        assert_eq!(buf.chunk(), b"a");
690
691        buf.truncate(100);
692
693        assert_eq!(buf.remaining(), 3);
694        assert_eq!(buf.chunk(), b"a");
695
696        buf.truncate(2);
697
698        assert_eq!(buf.remaining(), 2);
699        assert_eq!(buf.chunk(), b"a");
700
701        buf.truncate(0);
702
703        assert_eq!(buf.remaining(), 0);
704        assert_eq!(buf.chunk(), EMPTY_SLICE);
705    }
706
707    /// This setup will return
708    ///
709    /// - A buffer
710    /// - Total size of this buffer.
711    /// - Total content of this buffer.
712    fn setup_buffer() -> (Buffer, usize, Bytes) {
713        let mut rng = thread_rng();
714
715        let bs = (0..100)
716            .map(|_| {
717                let len = rng.gen_range(1..100);
718                let mut buf = vec![0; len];
719                rng.fill(&mut buf[..]);
720                Bytes::from(buf)
721            })
722            .collect::<Vec<_>>();
723
724        let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
725        let total_content = bs.iter().flatten().copied().collect::<Bytes>();
726        let buf = Buffer::from(bs);
727
728        (buf, total_size, total_content)
729    }
730
731    #[test]
732    fn fuzz_buffer_advance() {
733        let mut rng = thread_rng();
734
735        let (mut buf, total_size, total_content) = setup_buffer();
736        assert_eq!(buf.remaining(), total_size);
737        assert_eq!(buf.to_bytes(), total_content);
738
739        let mut cur = 0;
740        // Loop at most 10000 times.
741        let mut times = 10000;
742        while !buf.is_empty() && times > 0 {
743            times -= 1;
744
745            let cnt = rng.gen_range(0..total_size - cur);
746            cur += cnt;
747            buf.advance(cnt);
748
749            assert_eq!(buf.remaining(), total_size - cur);
750            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
751        }
752    }
753
754    #[test]
755    fn fuzz_buffer_iter() {
756        let mut rng = thread_rng();
757
758        let (mut buf, total_size, total_content) = setup_buffer();
759        assert_eq!(buf.remaining(), total_size);
760        assert_eq!(buf.to_bytes(), total_content);
761
762        let mut cur = 0;
763        while buf.is_empty() {
764            let cnt = rng.gen_range(0..total_size - cur);
765            cur += cnt;
766            buf.advance(cnt);
767
768            // Before next
769            assert_eq!(buf.remaining(), total_size - cur);
770            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
771
772            if let Some(bs) = buf.next() {
773                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
774                cur += bs.len();
775            }
776
777            // After next
778            assert_eq!(buf.remaining(), total_size - cur);
779            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
780        }
781    }
782
783    #[test]
784    fn fuzz_buffer_truncate() {
785        let mut rng = thread_rng();
786
787        let (mut buf, total_size, total_content) = setup_buffer();
788        assert_eq!(buf.remaining(), total_size);
789        assert_eq!(buf.to_bytes(), total_content);
790
791        let mut cur = 0;
792        while buf.is_empty() {
793            let cnt = rng.gen_range(0..total_size - cur);
794            cur += cnt;
795            buf.advance(cnt);
796
797            // Before truncate
798            assert_eq!(buf.remaining(), total_size - cur);
799            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
800
801            let truncate_size = rng.gen_range(0..total_size - cur);
802            buf.truncate(truncate_size);
803
804            // After truncate
805            assert_eq!(buf.remaining(), truncate_size);
806            assert_eq!(
807                buf.to_bytes(),
808                total_content.slice(cur..cur + truncate_size)
809            );
810
811            // Try next after truncate
812            if let Some(bs) = buf.next() {
813                assert_eq!(bs, total_content.slice(cur..cur + bs.len()));
814                cur += bs.len();
815            }
816
817            // After next
818            assert_eq!(buf.remaining(), total_size - cur);
819            assert_eq!(buf.to_bytes(), total_content.slice(cur..));
820        }
821    }
822}