opendal/services/sftp/
reader.rs1use bb8::PooledConnection;
19use bytes::BytesMut;
20use openssh_sftp_client::file::File;
21
22use super::core::Manager;
23use super::error::parse_sftp_error;
24use crate::raw::*;
25use crate::*;
26
27pub struct SftpReader {
28 _conn: PooledConnection<'static, Manager>,
30
31 file: File,
32 chunk: usize,
33 size: Option<usize>,
34 read: usize,
35 buf: BytesMut,
36}
37
38impl SftpReader {
39 pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self {
40 Self {
41 _conn: conn,
42 file,
43 size: size.map(|v| v as usize),
44 chunk: 2 * 1024 * 1024,
45 read: 0,
46 buf: BytesMut::new(),
47 }
48 }
49}
50
51impl oio::Read for SftpReader {
52 async fn read(&mut self) -> Result<Buffer> {
53 if self.read >= self.size.unwrap_or(usize::MAX) {
54 return Ok(Buffer::new());
55 }
56
57 let size = if let Some(size) = self.size {
58 (size - self.read).min(self.chunk)
59 } else {
60 self.chunk
61 };
62 self.buf.reserve(size);
63
64 let Some(bytes) = self
65 .file
66 .read(size as u32, self.buf.split_off(0))
67 .await
68 .map_err(parse_sftp_error)?
69 else {
70 return Ok(Buffer::new());
71 };
72
73 self.read += bytes.len();
74 self.buf = bytes;
75 let bs = self.buf.split();
76 Ok(Buffer::from(bs.freeze()))
77 }
78}