opendal/types/context/
read.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Bound;
19use std::ops::Range;
20use std::ops::RangeBounds;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26/// ReadContext holds the immutable context for give read operation.
27pub struct ReadContext {
28    /// The accessor to the storage services.
29    acc: Accessor,
30    /// Path to the file.
31    path: String,
32    /// Arguments for the read operation.
33    args: OpRead,
34    /// Options for the reader.
35    options: OpReader,
36}
37
38impl ReadContext {
39    /// Create a new ReadContext.
40    #[inline]
41    pub fn new(acc: Accessor, path: String, args: OpRead, options: OpReader) -> Self {
42        Self {
43            acc,
44            path,
45            args,
46            options,
47        }
48    }
49
50    /// Get the accessor.
51    #[inline]
52    pub fn accessor(&self) -> &Accessor {
53        &self.acc
54    }
55
56    /// Get the path.
57    #[inline]
58    pub fn path(&self) -> &str {
59        &self.path
60    }
61
62    /// Get the arguments.
63    #[inline]
64    pub fn args(&self) -> &OpRead {
65        &self.args
66    }
67
68    /// Get the options.
69    #[inline]
70    pub fn options(&self) -> &OpReader {
71        &self.options
72    }
73
74    /// Parse the range bounds into a range.
75    pub(crate) async fn parse_into_range(
76        &self,
77        range: impl RangeBounds<u64>,
78    ) -> Result<Range<u64>> {
79        let start = match range.start_bound() {
80            Bound::Included(v) => *v,
81            Bound::Excluded(v) => v + 1,
82            Bound::Unbounded => 0,
83        };
84
85        let end = match range.end_bound() {
86            Bound::Included(v) => v + 1,
87            Bound::Excluded(v) => *v,
88            Bound::Unbounded => {
89                let mut op_stat = OpStat::new();
90
91                if let Some(v) = self.args().version() {
92                    op_stat = op_stat.with_version(v);
93                }
94
95                self.accessor()
96                    .stat(self.path(), op_stat)
97                    .await?
98                    .into_metadata()
99                    .content_length()
100            }
101        };
102
103        Ok(start..end)
104    }
105}
106
107/// ReadGenerator is used to generate new readers.
108///
109/// If chunk is None, ReaderGenerator will only return one reader.
110/// Otherwise, ReaderGenerator will return multiple readers, each with size
111/// of chunk.
112///
113/// It's design that we didn't implement the generator as a stream, because
114/// we don't expose the generator to the user. Instead, we use the async method
115/// directly to keep it simple and easy to understand.
116pub struct ReadGenerator {
117    ctx: Arc<ReadContext>,
118
119    offset: u64,
120    size: Option<u64>,
121}
122
123impl ReadGenerator {
124    /// Create a new ReadGenerator.
125    #[inline]
126    pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
127        Self { ctx, offset, size }
128    }
129
130    /// Generate next range to read.
131    fn next_range(&mut self) -> Option<BytesRange> {
132        if self.size == Some(0) {
133            return None;
134        }
135
136        let next_offset = self.offset;
137        let next_size = match self.size {
138            // Given size is None, read all data.
139            None => {
140                // Update size to Some(0) to indicate that there is no more data to read.
141                self.size = Some(0);
142                None
143            }
144            Some(remaining) => {
145                // If chunk is set, read data in chunks.
146                let read_size = self
147                    .ctx
148                    .options
149                    .chunk()
150                    .map_or(remaining, |chunk| remaining.min(chunk as u64));
151                // Update (offset, size) before building future.
152                self.offset += read_size;
153                self.size = Some(remaining - read_size);
154                Some(read_size)
155            }
156        };
157
158        Some(BytesRange::new(next_offset, next_size))
159    }
160
161    /// Generate next reader.
162    pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
163        let Some(range) = self.next_range() else {
164            return Ok(None);
165        };
166
167        let args = self.ctx.args.clone().with_range(range);
168        let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
169        Ok(Some(r))
170    }
171}
172
173#[cfg(test)]
174mod tests {
175
176    use bytes::Bytes;
177
178    use super::*;
179
180    #[tokio::test]
181    async fn test_next_reader() -> Result<()> {
182        let op = Operator::via_iter(Scheme::Memory, [])?;
183        op.write(
184            "test",
185            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
186        )
187        .await?;
188
189        let acc = op.into_inner();
190        let ctx = Arc::new(ReadContext::new(
191            acc,
192            "test".to_string(),
193            OpRead::new(),
194            OpReader::new().with_chunk(3),
195        ));
196        let mut generator = ReadGenerator::new(ctx, 0, Some(10));
197        let mut readers = vec![];
198        while let Some(r) = generator.next_reader().await? {
199            readers.push(r);
200        }
201
202        pretty_assertions::assert_eq!(readers.len(), 4);
203        Ok(())
204    }
205
206    #[tokio::test]
207    async fn test_next_reader_without_size() -> Result<()> {
208        let op = Operator::via_iter(Scheme::Memory, [])?;
209        op.write(
210            "test",
211            Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
212        )
213        .await?;
214
215        let acc = op.into_inner();
216        let ctx = Arc::new(ReadContext::new(
217            acc,
218            "test".to_string(),
219            OpRead::new(),
220            OpReader::new().with_chunk(3),
221        ));
222        let mut generator = ReadGenerator::new(ctx, 0, None);
223        let mut readers = vec![];
224        while let Some(r) = generator.next_reader().await? {
225            readers.push(r);
226        }
227
228        pretty_assertions::assert_eq!(readers.len(), 1);
229        Ok(())
230    }
231}