opendal/services/ftp/
writer.rs1use bb8::PooledConnection;
19use bytes::Buf;
20use futures::AsyncWrite;
21use futures::AsyncWriteExt;
22
23use super::backend::Manager;
24use super::err::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub struct FtpWriter {
29 target_path: String,
30 tmp_path: Option<String>,
31 ftp_stream: PooledConnection<'static, Manager>,
32 data_stream: Option<Box<dyn AsyncWrite + Sync + Send + Unpin + 'static>>,
33}
34
35unsafe impl Sync for FtpWriter {}
39
40impl FtpWriter {
46 pub fn new(
47 ftp_stream: PooledConnection<'static, Manager>,
48 target_path: String,
49 tmp_path: Option<String>,
50 ) -> Self {
51 FtpWriter {
52 target_path,
53 tmp_path,
54 ftp_stream,
55 data_stream: None,
56 }
57 }
58}
59
60impl oio::Write for FtpWriter {
61 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
62 let path = if let Some(tmp_path) = &self.tmp_path {
63 tmp_path
64 } else {
65 &self.target_path
66 };
67
68 if self.data_stream.is_none() {
69 self.data_stream = Some(Box::new(
70 self.ftp_stream
71 .append_with_stream(path)
72 .await
73 .map_err(parse_error)?,
74 ));
75 }
76
77 while bs.has_remaining() {
78 let n = self
79 .data_stream
80 .as_mut()
81 .unwrap()
82 .write(bs.chunk())
83 .await
84 .map_err(|err| {
85 Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err)
86 })?;
87 bs.advance(n);
88 }
89
90 Ok(())
91 }
92
93 async fn close(&mut self) -> Result<Metadata> {
94 let data_stream = self.data_stream.take();
95 if let Some(mut data_stream) = data_stream {
96 data_stream.flush().await.map_err(|err| {
97 Error::new(ErrorKind::Unexpected, "flush data stream failed").set_source(err)
98 })?;
99
100 self.ftp_stream
101 .finalize_put_stream(data_stream)
102 .await
103 .map_err(parse_error)?;
104
105 if let Some(tmp_path) = &self.tmp_path {
106 self.ftp_stream
107 .rename(tmp_path, &self.target_path)
108 .await
109 .map_err(parse_error)?;
110 }
111 }
112
113 Ok(Metadata::default())
114 }
115
116 async fn abort(&mut self) -> Result<()> {
117 Err(Error::new(
118 ErrorKind::Unsupported,
119 "FtpWriter doesn't support abort",
120 ))
121 }
122}