opendal_core/raw/oio/read/
api.rs1use std::mem;
19use std::ops::Deref;
20use std::ops::DerefMut;
21
22use bytes::Bytes;
23use futures::Future;
24
25use crate::raw::*;
26use crate::*;
27
28pub type Reader = Box<dyn ReadDyn>;
30
31pub trait Read: Unpin + Send + Sync {
35 fn open(
37 &self,
38 range: BytesRange,
39 ) -> impl Future<Output = Result<(RpRead, Box<dyn ReadStreamDyn>)>> + MaybeSend;
40
41 fn read(&self, range: BytesRange)
43 -> impl Future<Output = Result<(RpRead, Buffer)>> + MaybeSend;
44}
45
46impl Read for () {
47 async fn open(&self, _: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
48 Err(Error::new(
49 ErrorKind::Unsupported,
50 "output reader doesn't support open",
51 ))
52 }
53
54 async fn read(&self, _: BytesRange) -> Result<(RpRead, Buffer)> {
55 Err(Error::new(
56 ErrorKind::Unsupported,
57 "output reader doesn't support read",
58 ))
59 }
60}
61
62pub trait ReadDyn: Unpin + Send + Sync {
65 fn open_dyn(
67 &self,
68 range: BytesRange,
69 ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>>;
70
71 fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>>;
73}
74
75impl<T: Read + ?Sized> ReadDyn for T {
76 fn open_dyn(
77 &self,
78 range: BytesRange,
79 ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>> {
80 Box::pin(self.open(range))
81 }
82
83 fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>> {
84 Box::pin(self.read(range))
85 }
86}
87
88impl<T: ReadDyn + ?Sized> Read for Box<T> {
89 async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
90 self.deref().open_dyn(range).await
91 }
92
93 async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
94 self.deref().read_dyn(range).await
95 }
96}
97
98pub trait ReadStream: Unpin + Send + Sync {
115 fn read(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend;
117
118 fn read_all(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend {
120 async {
121 let mut bufs = vec![];
122 loop {
123 match self.read().await {
124 Ok(buf) if buf.is_empty() => break,
125 Ok(buf) => bufs.push(buf),
126 Err(err) => return Err(err),
127 }
128 }
129 Ok(bufs.into_iter().flatten().collect())
130 }
131 }
132}
133
134impl ReadStream for () {
135 async fn read(&mut self) -> Result<Buffer> {
136 Err(Error::new(
137 ErrorKind::Unsupported,
138 "output reader doesn't support read",
139 ))
140 }
141}
142
143impl ReadStream for Bytes {
144 async fn read(&mut self) -> Result<Buffer> {
145 Ok(Buffer::from(self.split_off(0)))
146 }
147}
148
149impl ReadStream for Buffer {
150 async fn read(&mut self) -> Result<Buffer> {
151 Ok(mem::take(self))
152 }
153}
154
155pub trait ReadStreamDyn: Unpin + Send + Sync {
158 fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
162
163 fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
165}
166
167impl<T: ReadStream + ?Sized> ReadStreamDyn for T {
168 fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
169 Box::pin(self.read())
170 }
171
172 fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
173 Box::pin(self.read_all())
174 }
175}
176
177impl<T: ReadStreamDyn + ?Sized> ReadStream for Box<T> {
182 async fn read(&mut self) -> Result<Buffer> {
183 self.deref_mut().read_dyn().await
184 }
185
186 async fn read_all(&mut self) -> Result<Buffer> {
187 self.deref_mut().read_all_dyn().await
188 }
189}