opendal/services/ftp/
core.rs1use std::sync::Arc;
19
20use bb8::Pool;
21use bb8::PooledConnection;
22use bb8::RunError;
23use futures_rustls::TlsConnector;
24use raw::Operation;
25use suppaftp::rustls::ClientConfig;
26use suppaftp::types::FileType;
27use suppaftp::AsyncRustlsConnector;
28use suppaftp::AsyncRustlsFtpStream;
29use suppaftp::FtpError;
30use suppaftp::ImplAsyncFtpStream;
31use suppaftp::Status;
32use tokio::sync::OnceCell;
33
34use super::err::parse_error;
35use crate::raw::AccessorInfo;
36use crate::*;
37
38pub struct FtpCore {
39 pub info: Arc<AccessorInfo>,
40 pub manager: Manager,
41 pub pool: OnceCell<Pool<Manager>>,
42}
43
44impl FtpCore {
45 pub async fn ftp_connect(&self, _: Operation) -> Result<PooledConnection<'static, Manager>> {
46 let pool = self
47 .pool
48 .get_or_try_init(|| async {
49 bb8::Pool::builder()
50 .max_size(64)
51 .build(self.manager.clone())
52 .await
53 })
54 .await
55 .map_err(parse_error)?;
56
57 pool.get_owned().await.map_err(|err| match err {
58 RunError::User(err) => parse_error(err),
59 RunError::TimedOut => {
60 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
61 }
62 })
63 }
64}
65
66#[derive(Clone)]
67pub struct Manager {
68 pub endpoint: String,
69 pub root: String,
70 pub user: String,
71 pub password: String,
72 pub enable_secure: bool,
73}
74
75impl bb8::ManageConnection for Manager {
76 type Connection = AsyncRustlsFtpStream;
77 type Error = FtpError;
78
79 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
80 let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
81 let mut ftp_stream = if self.enable_secure {
83 let mut root_store = suppaftp::rustls::RootCertStore::empty();
84 for cert in
85 rustls_native_certs::load_native_certs().expect("could not load platform certs")
86 {
87 root_store.add(cert).unwrap();
88 }
89
90 let cfg = ClientConfig::builder()
91 .with_root_certificates(root_store)
92 .with_no_client_auth();
93 stream
94 .into_secure(
95 AsyncRustlsConnector::from(TlsConnector::from(Arc::new(cfg))),
96 &self.endpoint,
97 )
98 .await?
99 } else {
100 stream
101 };
102
103 if !self.user.is_empty() {
105 ftp_stream.login(&self.user, &self.password).await?;
106 }
107
108 match ftp_stream.cwd(&self.root).await {
110 Err(FtpError::UnexpectedResponse(e)) if e.status == Status::FileUnavailable => {
111 ftp_stream.mkdir(&self.root).await?;
112 ftp_stream.cwd(&self.root).await?;
114 }
115 Err(e) => return Err(e),
117 Ok(_) => (),
119 }
120
121 ftp_stream.transfer_type(FileType::Binary).await?;
122
123 Ok(ftp_stream)
124 }
125
126 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
127 conn.noop().await
128 }
129
130 fn has_broken(&self, _: &mut Self::Connection) -> bool {
134 true
135 }
136}