opendal/services/ftp/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bb8::PooledConnection;
19use bytes::Buf;
20use futures::AsyncWrite;
21use futures::AsyncWriteExt;
22
23use super::backend::Manager;
24use super::err::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub struct FtpWriter {
29    target_path: String,
30    tmp_path: Option<String>,
31    ftp_stream: PooledConnection<'static, Manager>,
32    data_stream: Option<Box<dyn AsyncWrite + Sync + Send + Unpin + 'static>>,
33}
34
35/// # Safety
36///
37/// We only have `&mut self` for FtpWrite.
38unsafe impl Sync for FtpWriter {}
39
40/// # TODO
41///
42/// Writer is not implemented correctly.
43///
44/// After we can use data stream, we should return it directly.
45impl FtpWriter {
46    pub fn new(
47        ftp_stream: PooledConnection<'static, Manager>,
48        target_path: String,
49        tmp_path: Option<String>,
50    ) -> Self {
51        FtpWriter {
52            target_path,
53            tmp_path,
54            ftp_stream,
55            data_stream: None,
56        }
57    }
58}
59
60impl oio::Write for FtpWriter {
61    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
62        let path = if let Some(tmp_path) = &self.tmp_path {
63            tmp_path
64        } else {
65            &self.target_path
66        };
67
68        if self.data_stream.is_none() {
69            self.data_stream = Some(Box::new(
70                self.ftp_stream
71                    .append_with_stream(path)
72                    .await
73                    .map_err(parse_error)?,
74            ));
75        }
76
77        while bs.has_remaining() {
78            let n = self
79                .data_stream
80                .as_mut()
81                .unwrap()
82                .write(bs.chunk())
83                .await
84                .map_err(|err| {
85                    Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err)
86                })?;
87            bs.advance(n);
88        }
89
90        Ok(())
91    }
92
93    async fn close(&mut self) -> Result<Metadata> {
94        let data_stream = self.data_stream.take();
95        if let Some(mut data_stream) = data_stream {
96            data_stream.flush().await.map_err(|err| {
97                Error::new(ErrorKind::Unexpected, "flush data stream failed").set_source(err)
98            })?;
99
100            self.ftp_stream
101                .finalize_put_stream(data_stream)
102                .await
103                .map_err(parse_error)?;
104
105            if let Some(tmp_path) = &self.tmp_path {
106                self.ftp_stream
107                    .rename(tmp_path, &self.target_path)
108                    .await
109                    .map_err(parse_error)?;
110            }
111        }
112
113        Ok(Metadata::default())
114    }
115
116    async fn abort(&mut self) -> Result<()> {
117        Err(Error::new(
118            ErrorKind::Unsupported,
119            "FtpWriter doesn't support abort",
120        ))
121    }
122}