object_store_opendal/service/
reader.rs1use std::sync::Arc;
19
20use bytes::Bytes;
21use futures::stream::BoxStream;
22use futures::TryStreamExt;
23use object_store::path::Path as ObjectStorePath;
24use object_store::ObjectStore;
25
26use opendal::raw::*;
27use opendal::*;
28
29use super::core::parse_op_read;
30use super::error::parse_error;
31
32pub struct ObjectStoreReader {
34 bytes_stream: BoxStream<'static, object_store::Result<Bytes>>,
35 meta: object_store::ObjectMeta,
36 args: OpRead,
37}
38
39impl ObjectStoreReader {
40 pub(crate) async fn new(
41 store: Arc<dyn ObjectStore + 'static>,
42 path: &str,
43 args: OpRead,
44 ) -> Result<Self> {
45 let path = ObjectStorePath::from(path);
46 let opts = parse_op_read(&args)?;
47 let result = store.get_opts(&path, opts).await.map_err(parse_error)?;
48 let meta = result.meta.clone();
49 let bytes_stream = result.into_stream();
50 Ok(Self {
51 bytes_stream,
52 meta,
53 args,
54 })
55 }
56
57 pub(crate) fn rp(&self) -> RpRead {
58 let mut rp = RpRead::new().with_size(Some(self.meta.size));
59 if !self.args.range().is_full() {
60 let range = self.args.range();
61 let size = match range.size() {
62 Some(size) => size,
63 None => self.meta.size,
64 };
65 rp = rp.with_range(Some(
66 BytesContentRange::default().with_range(range.offset(), range.offset() + size - 1),
67 ));
68 }
69 rp
70 }
71}
72
73unsafe impl Sync for ObjectStoreReader {}
76
77impl oio::Read for ObjectStoreReader {
78 async fn read(&mut self) -> Result<Buffer> {
79 let bs = self.bytes_stream.try_next().await.map_err(parse_error)?;
80 Ok(bs.map(Buffer::from).unwrap_or_default())
81 }
82}