opendal/raw/oio/buf/
queue_buf.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::mem;
20
21use bytes::Buf;
22
23use crate::*;
24
25/// QueueBuf is a queue of [`Buffer`].
26///
27/// It's designed to allow storing multiple buffers without copying underlying bytes and consume them
28/// in order.
29///
30/// QueueBuf mainly provides the following operations:
31///
32/// - `push`: Push a new buffer in the queue.
33/// - `collect`: Collect all buffer in the queue as a new [`Buffer`]
34/// - `advance`: Advance the queue by `cnt` bytes.
35#[derive(Clone, Default)]
36pub struct QueueBuf(VecDeque<Buffer>);
37
38impl QueueBuf {
39    /// Create a new buffer queue.
40    #[inline]
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Push new [`Buffer`] into the queue.
46    #[inline]
47    pub fn push(&mut self, buf: Buffer) {
48        if buf.is_empty() {
49            return;
50        }
51
52        self.0.push_back(buf);
53    }
54
55    /// Total bytes size inside the buffer queue.
56    #[inline]
57    pub fn len(&self) -> usize {
58        self.0.iter().map(|b| b.len()).sum()
59    }
60
61    /// Is the buffer queue empty.
62    #[inline]
63    pub fn is_empty(&self) -> bool {
64        self.len() == 0
65    }
66
67    /// Take the entire buffer queue and leave `self` in empty states.
68    #[inline]
69    pub fn take(&mut self) -> QueueBuf {
70        mem::take(self)
71    }
72
73    /// Build a new [`Buffer`] from the queue.
74    ///
75    /// If the queue is empty, it will return an empty buffer. Otherwise, it will iterate over all
76    /// buffers and collect them into a new buffer.
77    ///
78    /// # Notes
79    ///
80    /// There are allocation overheads when collecting multiple buffers into a new buffer. But
81    /// most of them should be acceptable since we can expect the item length of buffers are slower
82    /// than 4k.
83    #[inline]
84    pub fn collect(mut self) -> Buffer {
85        if self.0.is_empty() {
86            Buffer::new()
87        } else if self.0.len() == 1 {
88            self.0.pop_front().unwrap()
89        } else {
90            self.0.into_iter().flatten().collect()
91        }
92    }
93
94    /// Advance the buffer queue by `cnt` bytes.
95    #[inline]
96    pub fn advance(&mut self, cnt: usize) {
97        assert!(cnt <= self.len(), "cannot advance past {cnt} bytes");
98
99        let mut new_cnt = cnt;
100        while new_cnt > 0 {
101            let buf = self.0.front_mut().expect("buffer must be valid");
102            if new_cnt < buf.remaining() {
103                buf.advance(new_cnt);
104                break;
105            } else {
106                new_cnt -= buf.remaining();
107                self.0.pop_front();
108            }
109        }
110    }
111
112    /// Clear the buffer queue.
113    #[inline]
114    pub fn clear(&mut self) {
115        self.0.clear()
116    }
117}