1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use bb8::PooledConnection;
use bytes::BytesMut;
use openssh_sftp_client::file::File;

use super::backend::Manager;
use super::error::parse_sftp_error;
use crate::raw::*;
use crate::*;

pub struct SftpReader {
    /// Keep the connection alive while data stream is alive.
    _conn: PooledConnection<'static, Manager>,

    file: File,
    chunk: usize,
    size: Option<usize>,
    read: usize,
    buf: BytesMut,
}

impl SftpReader {
    pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self {
        Self {
            _conn: conn,
            file,
            size: size.map(|v| v as usize),
            chunk: 2 * 1024 * 1024,
            read: 0,
            buf: BytesMut::new(),
        }
    }
}

impl oio::Read for SftpReader {
    async fn read(&mut self) -> Result<Buffer> {
        if self.read >= self.size.unwrap_or(usize::MAX) {
            return Ok(Buffer::new());
        }

        let size = if let Some(size) = self.size {
            (size - self.read).min(self.chunk)
        } else {
            self.chunk
        };
        self.buf.reserve(size);

        let Some(bytes) = self
            .file
            .read(size as u32, self.buf.split_off(0))
            .await
            .map_err(parse_sftp_error)?
        else {
            return Ok(Buffer::new());
        };

        self.read += bytes.len();
        self.buf = bytes;
        let bs = self.buf.split();
        Ok(Buffer::from(bs.freeze()))
    }
}