opendal/services/ftp/
reader.rs1use bb8::PooledConnection;
19use bytes::BytesMut;
20use futures::AsyncRead;
21use futures::AsyncReadExt;
22
23use super::backend::Manager;
24use super::err::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub struct FtpReader {
29 _ftp_stream: PooledConnection<'static, Manager>,
31
32 data_stream: Box<dyn AsyncRead + Sync + Send + Unpin + 'static>,
33 chunk: usize,
34 buf: BytesMut,
35}
36
37unsafe impl Sync for FtpReader {}
41
42impl FtpReader {
43 pub async fn new(
44 mut ftp_stream: PooledConnection<'static, Manager>,
45 path: String,
46 args: OpRead,
47 ) -> Result<Self> {
48 let (offset, size) = (
49 args.range().offset(),
50 args.range().size().unwrap_or(u64::MAX),
51 );
52 if offset != 0 {
53 ftp_stream
54 .resume_transfer(offset as usize)
55 .await
56 .map_err(parse_error)?;
57 }
58 let ds = ftp_stream
59 .retr_as_stream(path)
60 .await
61 .map_err(parse_error)?
62 .take(size as _);
63
64 Ok(Self {
65 _ftp_stream: ftp_stream,
66
67 data_stream: Box::new(ds),
68 chunk: 1024 * 1024,
69 buf: BytesMut::new(),
70 })
71 }
72}
73
74impl oio::Read for FtpReader {
75 async fn read(&mut self) -> Result<Buffer> {
76 self.buf.resize(self.chunk, 0);
77 let n = self
78 .data_stream
79 .read(&mut self.buf)
80 .await
81 .map_err(new_std_io_error)?;
82 Ok(Buffer::from(self.buf.split_to(n).freeze()))
83 }
84}