opendal_core/raw/oio/read/stream_read.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::Future;
19
20use crate::raw::oio::ReadStream;
21use crate::raw::*;
22use crate::*;
23
24/// StreamRead is used to implement [`oio::Read`] based on native range streams.
25///
26/// Services that implement [`StreamRead`] only need to expose their native
27/// `open(range)` operation. [`StreamReader`] will provide the complete
28/// [`oio::Read`] contract.
29pub trait StreamRead: Send + Sync + Unpin + 'static {
30 /// Open a stream to read the requested range.
31 fn open(
32 &self,
33 range: BytesRange,
34 ) -> impl Future<Output = Result<(RpRead, Box<dyn oio::ReadStreamDyn>)>> + MaybeSend;
35}
36
37/// StreamReader implements [`oio::Read`] based on [`StreamRead`].
38pub struct StreamReader<R: StreamRead> {
39 inner: R,
40}
41
42impl<R: StreamRead> StreamReader<R> {
43 /// Create a new [`StreamReader`].
44 pub fn new(inner: R) -> Self {
45 Self { inner }
46 }
47
48 /// Consume the reader and return the inner [`StreamRead`].
49 pub fn into_inner(self) -> R {
50 self.inner
51 }
52}
53
54impl<R: StreamRead> oio::Read for StreamReader<R> {
55 async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
56 self.inner.open(range).await
57 }
58
59 async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
60 let expected = range
61 .size()
62 .ok_or_else(|| Error::new(ErrorKind::Unsupported, "read requires a bounded range"))?;
63
64 let (rp, mut stream) = self.inner.open(range).await?;
65 let buffer = stream.read_all().await?;
66 if buffer.len() as u64 != expected {
67 return Err(
68 Error::new(ErrorKind::Unexpected, "reader got unexpected data size")
69 .with_context("expect", expected)
70 .with_context("actual", buffer.len() as u64),
71 );
72 }
73
74 Ok((rp, buffer))
75 }
76}