opendal_core/services/ftp/
core.rs1use fastpool::{ManageObject, ObjectStatus, bounded};
19use futures_rustls::TlsConnector;
20use std::sync::Arc;
21use suppaftp::AsyncRustlsConnector;
22use suppaftp::AsyncRustlsFtpStream;
23use suppaftp::FtpError;
24use suppaftp::ImplAsyncFtpStream;
25use suppaftp::Status;
26use suppaftp::rustls::ClientConfig;
27use suppaftp::types::FileType;
28
29use super::err::format_ftp_error;
30use crate::raw::*;
31use crate::*;
32
33pub struct FtpCore {
34 info: Arc<AccessorInfo>,
35 pool: Arc<bounded::Pool<Manager>>,
36}
37
38impl FtpCore {
39 pub fn new(info: Arc<AccessorInfo>, manager: Manager) -> Self {
40 let pool = bounded::Pool::new(bounded::PoolConfig::new(64), manager);
41 Self { info, pool }
42 }
43
44 pub fn info(&self) -> Arc<AccessorInfo> {
45 self.info.clone()
46 }
47
48 pub async fn ftp_connect(&self, _: Operation) -> Result<bounded::Object<Manager>> {
49 let fut = self.pool.get();
50
51 tokio::select! {
52 _ = tokio::time::sleep(Duration::from_secs(10)) => {
53 Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
54 }
55 result = fut => match result {
56 Ok(conn) => Ok(conn),
57 Err(err) => Err(format_ftp_error(err)),
58 }
59 }
60 }
61}
62
63#[derive(Clone)]
64pub struct Manager {
65 pub endpoint: String,
66 pub root: String,
67 pub user: String,
68 pub password: String,
69 pub enable_secure: bool,
70}
71
72impl ManageObject for Manager {
73 type Object = AsyncRustlsFtpStream;
74 type Error = FtpError;
75
76 async fn create(&self) -> Result<Self::Object, Self::Error> {
77 let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
78 let mut ftp_stream = if self.enable_secure {
80 let mut root_store = suppaftp::rustls::RootCertStore::empty();
81 for cert in
82 rustls_native_certs::load_native_certs().expect("could not load platform certs")
83 {
84 root_store.add(cert).unwrap();
85 }
86
87 let cfg = ClientConfig::builder()
88 .with_root_certificates(root_store)
89 .with_no_client_auth();
90 stream
91 .into_secure(
92 AsyncRustlsConnector::from(TlsConnector::from(Arc::new(cfg))),
93 &self.endpoint,
94 )
95 .await?
96 } else {
97 stream
98 };
99
100 if !self.user.is_empty() {
102 ftp_stream.login(&self.user, &self.password).await?;
103 }
104
105 match ftp_stream.cwd(&self.root).await {
107 Err(FtpError::UnexpectedResponse(e)) if e.status == Status::FileUnavailable => {
108 ftp_stream.mkdir(&self.root).await?;
109 ftp_stream.cwd(&self.root).await?;
111 }
112 Err(e) => return Err(e),
114 Ok(_) => (),
116 }
117
118 ftp_stream.transfer_type(FileType::Binary).await?;
119
120 Ok(ftp_stream)
121 }
122
123 async fn is_recyclable(
124 &self,
125 o: &mut Self::Object,
126 _: &ObjectStatus,
127 ) -> Result<(), Self::Error> {
128 o.noop().await
129 }
130}