Skip to main content

opendal_core/raw/oio/read/
api.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem;
19use std::ops::Deref;
20use std::ops::DerefMut;
21
22use bytes::Bytes;
23use futures::Future;
24
25use crate::raw::*;
26use crate::*;
27
28/// Reader is a type erased [`Read`].
29pub type Reader = Box<dyn ReadDyn>;
30
31/// Read is the internal trait used by OpenDAL to read ranges from storage.
32///
33/// Users should not use or import this trait unless they are implementing an `Accessor`.
34pub trait Read: Unpin + Send + Sync {
35    /// Open a range stream for the given range.
36    fn open(
37        &self,
38        range: BytesRange,
39    ) -> impl Future<Output = Result<(RpRead, Box<dyn ReadStreamDyn>)>> + MaybeSend;
40
41    /// Read an exact bounded range into [`Buffer`].
42    fn read(&self, range: BytesRange)
43    -> impl Future<Output = Result<(RpRead, Buffer)>> + MaybeSend;
44}
45
46impl Read for () {
47    async fn open(&self, _: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
48        Err(Error::new(
49            ErrorKind::Unsupported,
50            "output reader doesn't support open",
51        ))
52    }
53
54    async fn read(&self, _: BytesRange) -> Result<(RpRead, Buffer)> {
55        Err(Error::new(
56            ErrorKind::Unsupported,
57            "output reader doesn't support read",
58        ))
59    }
60}
61
62/// ReadDyn is the dyn version of [`Read`] make it possible to use as
63/// `Box<dyn ReadDyn>`.
64pub trait ReadDyn: Unpin + Send + Sync {
65    /// The dyn version of [`Read::open`].
66    fn open_dyn(
67        &self,
68        range: BytesRange,
69    ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>>;
70
71    /// The dyn version of [`Read::read`].
72    fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>>;
73}
74
75impl<T: Read + ?Sized> ReadDyn for T {
76    fn open_dyn(
77        &self,
78        range: BytesRange,
79    ) -> BoxedFuture<'_, Result<(RpRead, Box<dyn ReadStreamDyn>)>> {
80        Box::pin(self.open(range))
81    }
82
83    fn read_dyn(&self, range: BytesRange) -> BoxedFuture<'_, Result<(RpRead, Buffer)>> {
84        Box::pin(self.read(range))
85    }
86}
87
88impl<T: ReadDyn + ?Sized> Read for Box<T> {
89    async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn ReadStreamDyn>)> {
90        self.deref().open_dyn(range).await
91    }
92
93    async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
94        self.deref().read_dyn(range).await
95    }
96}
97
98/// ReadStream is the internal trait used by OpenDAL to stream data from storage.
99///
100/// Users should not use or import this trait unless they are implementing an `Accessor`.
101///
102/// # Notes
103///
104/// ## Object Safety
105///
106/// `ReadStream` uses `async in trait`, making it not object safe, preventing the use of
107/// `Box<dyn ReadStream>`.
108/// To address this, we've introduced [`ReadStreamDyn`] and its compatible type
109/// `Box<dyn ReadStreamDyn>`.
110///
111/// `ReadStreamDyn` uses `Box::pin()` to transform the returned future into a [`BoxedFuture`],
112/// introducing an additional layer of indirection and an extra allocation. Ideally,
113/// `ReadStreamDyn` should occur only once, at the outermost level of our API.
114pub trait ReadStream: Unpin + Send + Sync {
115    /// Read the next data chunk from the stream.
116    fn read(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend;
117
118    /// Read all data from the reader.
119    fn read_all(&mut self) -> impl Future<Output = Result<Buffer>> + MaybeSend {
120        async {
121            let mut bufs = vec![];
122            loop {
123                match self.read().await {
124                    Ok(buf) if buf.is_empty() => break,
125                    Ok(buf) => bufs.push(buf),
126                    Err(err) => return Err(err),
127                }
128            }
129            Ok(bufs.into_iter().flatten().collect())
130        }
131    }
132}
133
134impl ReadStream for () {
135    async fn read(&mut self) -> Result<Buffer> {
136        Err(Error::new(
137            ErrorKind::Unsupported,
138            "output reader doesn't support read",
139        ))
140    }
141}
142
143impl ReadStream for Bytes {
144    async fn read(&mut self) -> Result<Buffer> {
145        Ok(Buffer::from(self.split_off(0)))
146    }
147}
148
149impl ReadStream for Buffer {
150    async fn read(&mut self) -> Result<Buffer> {
151        Ok(mem::take(self))
152    }
153}
154
155/// ReadStreamDyn is the dyn version of [`ReadStream`] make it possible to use as
156/// `Box<dyn ReadStreamDyn>`.
157pub trait ReadStreamDyn: Unpin + Send + Sync {
158    /// The dyn version of [`ReadStream::read`].
159    ///
160    /// This function returns a boxed future to make it object safe.
161    fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
162
163    /// The dyn version of [`ReadStream::read_all`].
164    fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>>;
165}
166
167impl<T: ReadStream + ?Sized> ReadStreamDyn for T {
168    fn read_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
169        Box::pin(self.read())
170    }
171
172    fn read_all_dyn(&mut self) -> BoxedFuture<'_, Result<Buffer>> {
173        Box::pin(self.read_all())
174    }
175}
176
177/// # NOTE
178///
179/// Take care about the `deref_mut()` here. This makes sure that we are calling functions
180/// upon `&mut T` instead of `&mut Box<T>`. The later could result in infinite recursion.
181impl<T: ReadStreamDyn + ?Sized> ReadStream for Box<T> {
182    async fn read(&mut self) -> Result<Buffer> {
183        self.deref_mut().read_dyn().await
184    }
185
186    async fn read_all(&mut self) -> Result<Buffer> {
187        self.deref_mut().read_all_dyn().await
188    }
189}