opendal/services/ftp/
core.rs1use super::err::parse_error;
19use crate::raw::AccessorInfo;
20use crate::*;
21use bb8::Pool;
22use bb8::PooledConnection;
23use bb8::RunError;
24use futures_rustls::TlsConnector;
25use raw::Operation;
26use std::sync::Arc;
27use suppaftp::rustls::ClientConfig;
28use suppaftp::types::FileType;
29use suppaftp::Status;
30use suppaftp::{AsyncRustlsConnector, AsyncRustlsFtpStream};
31use suppaftp::{FtpError, ImplAsyncFtpStream};
32use tokio::sync::OnceCell;
33
34pub struct FtpCore {
35 pub info: Arc<AccessorInfo>,
36 pub manager: Manager,
37 pub pool: OnceCell<Pool<Manager>>,
38}
39
40impl FtpCore {
41 pub async fn ftp_connect(&self, _: Operation) -> Result<PooledConnection<'static, Manager>> {
42 let pool = self
43 .pool
44 .get_or_try_init(|| async {
45 bb8::Pool::builder()
46 .max_size(64)
47 .build(self.manager.clone())
48 .await
49 })
50 .await
51 .map_err(parse_error)?;
52
53 pool.get_owned().await.map_err(|err| match err {
54 RunError::User(err) => parse_error(err),
55 RunError::TimedOut => {
56 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
57 }
58 })
59 }
60}
61
62#[derive(Clone)]
63pub struct Manager {
64 pub endpoint: String,
65 pub root: String,
66 pub user: String,
67 pub password: String,
68 pub enable_secure: bool,
69}
70
71#[async_trait::async_trait]
72impl bb8::ManageConnection for Manager {
73 type Connection = AsyncRustlsFtpStream;
74 type Error = FtpError;
75
76 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
77 let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
78 let mut ftp_stream = if self.enable_secure {
80 let mut root_store = suppaftp::rustls::RootCertStore::empty();
81 for cert in
82 rustls_native_certs::load_native_certs().expect("could not load platform certs")
83 {
84 root_store.add(cert).unwrap();
85 }
86
87 let cfg = ClientConfig::builder()
88 .with_root_certificates(root_store)
89 .with_no_client_auth();
90 stream
91 .into_secure(
92 AsyncRustlsConnector::from(TlsConnector::from(Arc::new(cfg))),
93 &self.endpoint,
94 )
95 .await?
96 } else {
97 stream
98 };
99
100 if !self.user.is_empty() {
102 ftp_stream.login(&self.user, &self.password).await?;
103 }
104
105 match ftp_stream.cwd(&self.root).await {
107 Err(FtpError::UnexpectedResponse(e)) if e.status == Status::FileUnavailable => {
108 ftp_stream.mkdir(&self.root).await?;
109 ftp_stream.cwd(&self.root).await?;
111 }
112 Err(e) => return Err(e),
114 Ok(_) => (),
116 }
117
118 ftp_stream.transfer_type(FileType::Binary).await?;
119
120 Ok(ftp_stream)
121 }
122
123 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
124 conn.noop().await
125 }
126
127 fn has_broken(&self, _: &mut Self::Connection) -> bool {
131 true
132 }
133}