opendal/services/ftp/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bb8::PooledConnection;
19use bytes::BytesMut;
20use futures::AsyncRead;
21use futures::AsyncReadExt;
22
23use super::backend::Manager;
24use super::err::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub struct FtpReader {
29    /// Keep the connection alive while data stream is alive.
30    _ftp_stream: PooledConnection<'static, Manager>,
31
32    data_stream: Box<dyn AsyncRead + Sync + Send + Unpin + 'static>,
33    chunk: usize,
34    buf: BytesMut,
35}
36
37/// # Safety
38///
39/// We only have `&mut self` for FtpReader.
40unsafe impl Sync for FtpReader {}
41
42impl FtpReader {
43    pub async fn new(
44        mut ftp_stream: PooledConnection<'static, Manager>,
45        path: String,
46        args: OpRead,
47    ) -> Result<Self> {
48        let (offset, size) = (
49            args.range().offset(),
50            args.range().size().unwrap_or(u64::MAX),
51        );
52        if offset != 0 {
53            ftp_stream
54                .resume_transfer(offset as usize)
55                .await
56                .map_err(parse_error)?;
57        }
58        let ds = ftp_stream
59            .retr_as_stream(path)
60            .await
61            .map_err(parse_error)?
62            .take(size as _);
63
64        Ok(Self {
65            _ftp_stream: ftp_stream,
66
67            data_stream: Box::new(ds),
68            chunk: 1024 * 1024,
69            buf: BytesMut::new(),
70        })
71    }
72}
73
74impl oio::Read for FtpReader {
75    async fn read(&mut self) -> Result<Buffer> {
76        self.buf.resize(self.chunk, 0);
77        let n = self
78            .data_stream
79            .read(&mut self.buf)
80            .await
81            .map_err(new_std_io_error)?;
82        Ok(Buffer::from(self.buf.split_to(n).freeze()))
83    }
84}