opendal/raw/oio/buf/
pooled_buf.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::fmt::{self};
22use std::sync::Mutex;
23
24use bytes::BytesMut;
25
26/// PooledBuf is a buffer pool that designed for reusing already allocated bufs.
27///
28/// It works as best-effort that tries to reuse the buffer if possible. It
29/// won't block the thread if the pool is locked, just returning a new buffer
30/// or dropping existing buffer.
31pub struct PooledBuf {
32    pool: Mutex<VecDeque<BytesMut>>,
33    size: usize,
34    initial_capacity: usize,
35}
36
37impl Debug for PooledBuf {
38    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
39        f.debug_struct("PooledBuf")
40            .field("size", &self.size)
41            .field("initial_capacity", &self.initial_capacity)
42            .finish_non_exhaustive()
43    }
44}
45
46impl PooledBuf {
47    /// Create a new buffer pool with a given size.
48    pub fn new(size: usize) -> Self {
49        Self {
50            pool: Mutex::new(VecDeque::with_capacity(size)),
51            size,
52            initial_capacity: 0,
53        }
54    }
55
56    /// Set the initial capacity of the buffer.
57    ///
58    /// The default value is 0.
59    pub fn with_initial_capacity(mut self, initial_capacity: usize) -> Self {
60        self.initial_capacity = initial_capacity;
61        self
62    }
63
64    /// Get a [`BytesMut`] from the pool.
65    ///
66    /// It's guaranteed that the buffer is empty.
67    pub fn get(&self) -> BytesMut {
68        // We don't want to block the thread if the pool is locked.
69        //
70        // Just returning a new buffer in this case.
71        let Ok(mut pool) = self.pool.try_lock() else {
72            return BytesMut::with_capacity(self.initial_capacity);
73        };
74
75        if let Some(buf) = pool.pop_front() {
76            buf
77        } else {
78            BytesMut::with_capacity(self.initial_capacity)
79        }
80    }
81
82    /// Put a [`BytesMut`] back to the pool.
83    pub fn put(&self, mut buf: BytesMut) {
84        // We don't want to block the thread if the pool is locked.
85        //
86        // Just dropping the buffer in this case.
87        let Ok(mut pool) = self.pool.try_lock() else {
88            return;
89        };
90
91        if pool.len() < self.size {
92            // Clean the buffer before putting it back to the pool.
93            buf.clear();
94            pool.push_back(buf);
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use bytes::BufMut;
102
103    use super::*;
104
105    #[test]
106    fn test_pooled_buf() {
107        let pool = PooledBuf::new(2);
108
109        let mut buf1 = pool.get();
110        buf1.put_slice(b"hello, world!");
111
112        let mut buf2 = pool.get();
113        buf2.reserve(1024);
114
115        pool.put(buf1);
116        pool.put(buf2);
117
118        let buf3 = pool.get();
119        assert_eq!(buf3.len(), 0);
120        assert_eq!(buf3.capacity(), 13);
121
122        let buf4 = pool.get();
123        assert_eq!(buf4.len(), 0);
124        assert_eq!(buf4.capacity(), 1024);
125    }
126}