opendal/services/ftp/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bb8::Pool;
21use bb8::PooledConnection;
22use bb8::RunError;
23use futures_rustls::TlsConnector;
24use raw::Operation;
25use suppaftp::rustls::ClientConfig;
26use suppaftp::types::FileType;
27use suppaftp::AsyncRustlsConnector;
28use suppaftp::AsyncRustlsFtpStream;
29use suppaftp::FtpError;
30use suppaftp::ImplAsyncFtpStream;
31use suppaftp::Status;
32use tokio::sync::OnceCell;
33
34use super::err::parse_error;
35use crate::raw::AccessorInfo;
36use crate::*;
37
38pub struct FtpCore {
39    pub info: Arc<AccessorInfo>,
40    pub manager: Manager,
41    pub pool: OnceCell<Pool<Manager>>,
42}
43
44impl FtpCore {
45    pub async fn ftp_connect(&self, _: Operation) -> Result<PooledConnection<'static, Manager>> {
46        let pool = self
47            .pool
48            .get_or_try_init(|| async {
49                bb8::Pool::builder()
50                    .max_size(64)
51                    .build(self.manager.clone())
52                    .await
53            })
54            .await
55            .map_err(parse_error)?;
56
57        pool.get_owned().await.map_err(|err| match err {
58            RunError::User(err) => parse_error(err),
59            RunError::TimedOut => {
60                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
61            }
62        })
63    }
64}
65
66#[derive(Clone)]
67pub struct Manager {
68    pub endpoint: String,
69    pub root: String,
70    pub user: String,
71    pub password: String,
72    pub enable_secure: bool,
73}
74
75impl bb8::ManageConnection for Manager {
76    type Connection = AsyncRustlsFtpStream;
77    type Error = FtpError;
78
79    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
80        let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
81        // switch to secure mode if ssl/tls is on.
82        let mut ftp_stream = if self.enable_secure {
83            let mut root_store = suppaftp::rustls::RootCertStore::empty();
84            for cert in
85                rustls_native_certs::load_native_certs().expect("could not load platform certs")
86            {
87                root_store.add(cert).unwrap();
88            }
89
90            let cfg = ClientConfig::builder()
91                .with_root_certificates(root_store)
92                .with_no_client_auth();
93            stream
94                .into_secure(
95                    AsyncRustlsConnector::from(TlsConnector::from(Arc::new(cfg))),
96                    &self.endpoint,
97                )
98                .await?
99        } else {
100            stream
101        };
102
103        // login if needed
104        if !self.user.is_empty() {
105            ftp_stream.login(&self.user, &self.password).await?;
106        }
107
108        // change to the root path
109        match ftp_stream.cwd(&self.root).await {
110            Err(FtpError::UnexpectedResponse(e)) if e.status == Status::FileUnavailable => {
111                ftp_stream.mkdir(&self.root).await?;
112                // Then change to root path
113                ftp_stream.cwd(&self.root).await?;
114            }
115            // Other errors, return.
116            Err(e) => return Err(e),
117            // Do nothing if success.
118            Ok(_) => (),
119        }
120
121        ftp_stream.transfer_type(FileType::Binary).await?;
122
123        Ok(ftp_stream)
124    }
125
126    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
127        conn.noop().await
128    }
129
130    /// Don't allow reuse conn.
131    ///
132    /// We need to investigate why reuse conn will cause error.
133    fn has_broken(&self, _: &mut Self::Connection) -> bool {
134        true
135    }
136}