opendal/services/sftp/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bb8::PooledConnection;
19use bytes::BytesMut;
20use openssh_sftp_client::file::File;
21
22use super::core::Manager;
23use super::error::parse_sftp_error;
24use crate::raw::*;
25use crate::*;
26
27pub struct SftpReader {
28    /// Keep the connection alive while data stream is alive.
29    _conn: PooledConnection<'static, Manager>,
30
31    file: File,
32    chunk: usize,
33    size: Option<usize>,
34    read: usize,
35    buf: BytesMut,
36}
37
38impl SftpReader {
39    pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self {
40        Self {
41            _conn: conn,
42            file,
43            size: size.map(|v| v as usize),
44            chunk: 2 * 1024 * 1024,
45            read: 0,
46            buf: BytesMut::new(),
47        }
48    }
49}
50
51impl oio::Read for SftpReader {
52    async fn read(&mut self) -> Result<Buffer> {
53        if self.read >= self.size.unwrap_or(usize::MAX) {
54            return Ok(Buffer::new());
55        }
56
57        let size = if let Some(size) = self.size {
58            (size - self.read).min(self.chunk)
59        } else {
60            self.chunk
61        };
62        self.buf.reserve(size);
63
64        let Some(bytes) = self
65            .file
66            .read(size as u32, self.buf.split_off(0))
67            .await
68            .map_err(parse_sftp_error)?
69        else {
70            return Ok(Buffer::new());
71        };
72
73        self.read += bytes.len();
74        self.buf = bytes;
75        let bs = self.buf.split();
76        Ok(Buffer::from(bs.freeze()))
77    }
78}