opendal/types/context/
read.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::{Bound, Range, RangeBounds};
19use std::sync::Arc;
20
21use crate::raw::*;
22use crate::*;
23
24/// ReadContext holds the immutable context for give read operation.
25pub struct ReadContext {
26    /// The accessor to the storage services.
27    acc: Accessor,
28    /// Path to the file.
29    path: String,
30    /// Arguments for the read operation.
31    args: OpRead,
32    /// Options for the reader.
33    options: OpReader,
34}
35
36impl ReadContext {
37    /// Create a new ReadContext.
38    #[inline]
39    pub fn new(acc: Accessor, path: String, args: OpRead, options: OpReader) -> Self {
40        Self {
41            acc,
42            path,
43            args,
44            options,
45        }
46    }
47
48    /// Get the accessor.
49    #[inline]
50    pub fn accessor(&self) -> &Accessor {
51        &self.acc
52    }
53
54    /// Get the path.
55    #[inline]
56    pub fn path(&self) -> &str {
57        &self.path
58    }
59
60    /// Get the arguments.
61    #[inline]
62    pub fn args(&self) -> &OpRead {
63        &self.args
64    }
65
66    /// Get the options.
67    #[inline]
68    pub fn options(&self) -> &OpReader {
69        &self.options
70    }
71
72    /// Parse the range bounds into a range.
73    pub(crate) async fn parse_into_range(
74        &self,
75        range: impl RangeBounds<u64>,
76    ) -> Result<Range<u64>> {
77        let start = match range.start_bound() {
78            Bound::Included(v) => *v,
79            Bound::Excluded(v) => v + 1,
80            Bound::Unbounded => 0,
81        };
82
83        let end = match range.end_bound() {
84            Bound::Included(v) => v + 1,
85            Bound::Excluded(v) => *v,
86            Bound::Unbounded => {
87                let mut op_stat = OpStat::new();
88
89                if let Some(v) = self.args().version() {
90                    op_stat = op_stat.with_version(v);
91                }
92
93                self.accessor()
94                    .stat(self.path(), op_stat)
95                    .await?
96                    .into_metadata()
97                    .content_length()
98            }
99        };
100
101        Ok(start..end)
102    }
103}
104
105/// ReadGenerator is used to generate new readers.
106///
107/// If chunk is None, ReaderGenerator will only return one reader.
108/// Otherwise, ReaderGenerator will return multiple readers, each with size
109/// of chunk.
110///
111/// It's design that we didn't implement the generator as a stream, because
112/// we don't expose the generator to the user. Instead, we use the async method
113/// directly to keep it simple and easy to understand.
114pub struct ReadGenerator {
115    ctx: Arc<ReadContext>,
116
117    offset: u64,
118    size: Option<u64>,
119}
120
121impl ReadGenerator {
122    /// Create a new ReadGenerator.
123    #[inline]
124    pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
125        Self { ctx, offset, size }
126    }
127
128    /// Generate next range to read.
129    fn next_range(&mut self) -> Option<BytesRange> {
130        if self.size == Some(0) {
131            return None;
132        }
133
134        let next_offset = self.offset;
135        let next_size = match self.size {
136            // Given size is None, read all data.
137            None => {
138                // Update size to Some(0) to indicate that there is no more data to read.
139                self.size = Some(0);
140                None
141            }
142            Some(remaining) => {
143                // If chunk is set, read data in chunks.
144                let read_size = self
145                    .ctx
146                    .options
147                    .chunk()
148                    .map_or(remaining, |chunk| remaining.min(chunk as u64));
149                // Update (offset, size) before building future.
150                self.offset += read_size;
151                self.size = Some(remaining - read_size);
152                Some(read_size)
153            }
154        };
155
156        Some(BytesRange::new(next_offset, next_size))
157    }
158
159    /// Generate next reader.
160    pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
161        let Some(range) = self.next_range() else {
162            return Ok(None);
163        };
164
165        let args = self.ctx.args.clone().with_range(range);
166        let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
167        Ok(Some(r))
168    }
169
170    /// Generate next blocking reader.
171    pub fn next_blocking_reader(&mut self) -> Result<Option<oio::BlockingReader>> {
172        let Some(range) = self.next_range() else {
173            return Ok(None);
174        };
175
176        let args = self.ctx.args.clone().with_range(range);
177        let (_, r) = self.ctx.acc.blocking_read(&self.ctx.path, args)?;
178        Ok(Some(r))
179    }
180}
181
182#[cfg(test)]
183mod tests {
184
185    use bytes::Bytes;
186
187    use super::*;
188
189    #[tokio::test]
190    async fn test_next_reader() -> Result<()> {
191        let op = Operator::via_iter(Scheme::Memory, [])?;
192        op.write(
193            "test",
194            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
195        )
196        .await?;
197
198        let acc = op.into_inner();
199        let ctx = Arc::new(ReadContext::new(
200            acc,
201            "test".to_string(),
202            OpRead::new(),
203            OpReader::new().with_chunk(3),
204        ));
205        let mut generator = ReadGenerator::new(ctx, 0, Some(10));
206        let mut readers = vec![];
207        while let Some(r) = generator.next_reader().await? {
208            readers.push(r);
209        }
210
211        pretty_assertions::assert_eq!(readers.len(), 4);
212        Ok(())
213    }
214
215    #[tokio::test]
216    async fn test_next_reader_without_size() -> Result<()> {
217        let op = Operator::via_iter(Scheme::Memory, [])?;
218        op.write(
219            "test",
220            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
221        )
222        .await?;
223
224        let acc = op.into_inner();
225        let ctx = Arc::new(ReadContext::new(
226            acc,
227            "test".to_string(),
228            OpRead::new(),
229            OpReader::new().with_chunk(3),
230        ));
231        let mut generator = ReadGenerator::new(ctx, 0, None);
232        let mut readers = vec![];
233        while let Some(r) = generator.next_reader().await? {
234            readers.push(r);
235        }
236
237        pretty_assertions::assert_eq!(readers.len(), 1);
238        Ok(())
239    }
240
241    #[test]
242    fn test_next_blocking_reader() -> Result<()> {
243        let op = Operator::via_iter(Scheme::Memory, [])?;
244        op.blocking().write(
245            "test",
246            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
247        )?;
248
249        let acc = op.into_inner();
250        let ctx = Arc::new(ReadContext::new(
251            acc,
252            "test".to_string(),
253            OpRead::new(),
254            OpReader::new().with_chunk(3),
255        ));
256        let mut generator = ReadGenerator::new(ctx, 0, Some(10));
257        let mut readers = vec![];
258        while let Some(r) = generator.next_blocking_reader()? {
259            readers.push(r);
260        }
261
262        pretty_assertions::assert_eq!(readers.len(), 4);
263        Ok(())
264    }
265}