opendal/services/compfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use compio::buf::IoBuf;
19use compio::dispatcher::Dispatcher;
20use std::future::Future;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use crate::raw::*;
25use crate::*;
26
27unsafe impl IoBuf for Buffer {
28    fn as_buf_ptr(&self) -> *const u8 {
29        self.current().as_ptr()
30    }
31
32    fn buf_len(&self) -> usize {
33        self.current().len()
34    }
35
36    fn buf_capacity(&self) -> usize {
37        // `Bytes` doesn't expose uninitialized capacity, so treat it as the same as `len`
38        self.current().len()
39    }
40}
41
42#[derive(Debug)]
43pub(super) struct CompfsCore {
44    pub info: Arc<AccessorInfo>,
45
46    pub root: PathBuf,
47    pub dispatcher: Dispatcher,
48    pub buf_pool: oio::PooledBuf,
49}
50
51impl CompfsCore {
52    pub fn prepare_path(&self, path: &str) -> PathBuf {
53        self.root.join(path.trim_end_matches('/'))
54    }
55
56    pub async fn exec<Fn, Fut, R>(&self, f: Fn) -> crate::Result<R>
57    where
58        Fn: FnOnce() -> Fut + Send + 'static,
59        Fut: Future<Output = std::io::Result<R>> + 'static,
60        R: Send + 'static,
61    {
62        self.dispatcher
63            .dispatch(f)
64            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio spawn io task failed"))?
65            .await
66            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio task cancelled"))?
67            .map_err(new_std_io_error)
68    }
69
70    pub async fn exec_blocking<Fn, R>(&self, f: Fn) -> Result<R>
71    where
72        Fn: FnOnce() -> R + Send + 'static,
73        R: Send + 'static,
74    {
75        self.dispatcher
76            .dispatch_blocking(f)
77            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio spawn blocking task failed"))?
78            .await
79            .map_err(|_| Error::new(ErrorKind::Unexpected, "compio task cancelled"))
80    }
81}
82
83// TODO: impl IoVectoredBuf for Buffer
84// impl IoVectoredBuf for Buffer {
85//     fn as_dyn_bufs(&self) -> impl Iterator<Item = &dyn IoBuf> {}
86//
87//     fn owned_iter(self) -> Result<OwnedIter<impl OwnedIterator<Inner = Self>>, Self> {
88//         Ok(OwnedIter::new(BufferIter {
89//             current: self.current(),
90//             buf: self,
91//         }))
92//     }
93// }
94
95// #[derive(Debug, Clone)]
96// struct BufferIter {
97//     buf: Buffer,
98//     current: Bytes,
99// }
100
101// impl IntoInner for BufferIter {
102//     type Inner = Buffer;
103//
104//     fn into_inner(self) -> Self::Inner {
105//         self.buf
106//     }
107// }
108
109// impl OwnedIterator for BufferIter {
110//     fn next(mut self) -> Result<Self, Self::Inner> {
111//         let Some(current) = self.buf.next() else {
112//             return Err(self.buf);
113//         };
114//         self.current = current;
115//         Ok(self)
116//     }
117//
118//     fn current(&self) -> &dyn IoBuf {
119//         &self.current
120//     }
121// }
122
123#[cfg(test)]
124mod tests {
125    use bytes::Buf;
126    use bytes::Bytes;
127    use rand::thread_rng;
128    use rand::Rng;
129
130    use super::*;
131
132    fn setup_buffer() -> (Buffer, usize, Bytes) {
133        let mut rng = thread_rng();
134
135        let bs = (0..100)
136            .map(|_| {
137                let len = rng.gen_range(1..100);
138                let mut buf = vec![0; len];
139                rng.fill(&mut buf[..]);
140                Bytes::from(buf)
141            })
142            .collect::<Vec<_>>();
143
144        let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
145        let total_content = bs.iter().flatten().copied().collect::<Bytes>();
146        let buf = Buffer::from(bs);
147
148        (buf, total_size, total_content)
149    }
150
151    #[test]
152    fn test_io_buf() {
153        let (buf, _len, _bytes) = setup_buffer();
154        let slice = IoBuf::as_slice(&buf);
155
156        assert_eq!(slice, buf.current().chunk())
157    }
158}