Skip to main content

opendal_core/raw/oio/read/
stream_read.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::Future;
19
20use crate::raw::oio::ReadStream;
21use crate::raw::*;
22use crate::*;
23
24/// StreamRead is used to implement [`oio::Read`] based on native range streams.
25///
26/// Services that implement [`StreamRead`] only need to expose their native
27/// `open(range)` operation. [`StreamReader`] will provide the complete
28/// [`oio::Read`] contract.
29pub trait StreamRead: Send + Sync + Unpin + 'static {
30    /// Open a stream to read the requested range.
31    fn open(
32        &self,
33        range: BytesRange,
34    ) -> impl Future<Output = Result<(RpRead, Box<dyn oio::ReadStreamDyn>)>> + MaybeSend;
35}
36
37/// StreamReader implements [`oio::Read`] based on [`StreamRead`].
38pub struct StreamReader<R: StreamRead> {
39    inner: R,
40}
41
42impl<R: StreamRead> StreamReader<R> {
43    /// Create a new [`StreamReader`].
44    pub fn new(inner: R) -> Self {
45        Self { inner }
46    }
47
48    /// Consume the reader and return the inner [`StreamRead`].
49    pub fn into_inner(self) -> R {
50        self.inner
51    }
52}
53
54impl<R: StreamRead> oio::Read for StreamReader<R> {
55    async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
56        self.inner.open(range).await
57    }
58
59    async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
60        let expected = range
61            .size()
62            .ok_or_else(|| Error::new(ErrorKind::Unsupported, "read requires a bounded range"))?;
63
64        let (rp, mut stream) = self.inner.open(range).await?;
65        let buffer = stream.read_all().await?;
66        if buffer.len() as u64 != expected {
67            return Err(
68                Error::new(ErrorKind::Unexpected, "reader got unexpected data size")
69                    .with_context("expect", expected)
70                    .with_context("actual", buffer.len() as u64),
71            );
72        }
73
74        Ok((rp, buffer))
75    }
76}