opendal/services/ftp/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::err::parse_error;
19use crate::raw::AccessorInfo;
20use crate::*;
21use bb8::Pool;
22use bb8::PooledConnection;
23use bb8::RunError;
24use futures_rustls::TlsConnector;
25use raw::Operation;
26use std::sync::Arc;
27use suppaftp::rustls::ClientConfig;
28use suppaftp::types::FileType;
29use suppaftp::Status;
30use suppaftp::{AsyncRustlsConnector, AsyncRustlsFtpStream};
31use suppaftp::{FtpError, ImplAsyncFtpStream};
32use tokio::sync::OnceCell;
33
34pub struct FtpCore {
35    pub info: Arc<AccessorInfo>,
36    pub manager: Manager,
37    pub pool: OnceCell<Pool<Manager>>,
38}
39
40impl FtpCore {
41    pub async fn ftp_connect(&self, _: Operation) -> Result<PooledConnection<'static, Manager>> {
42        let pool = self
43            .pool
44            .get_or_try_init(|| async {
45                bb8::Pool::builder()
46                    .max_size(64)
47                    .build(self.manager.clone())
48                    .await
49            })
50            .await
51            .map_err(parse_error)?;
52
53        pool.get_owned().await.map_err(|err| match err {
54            RunError::User(err) => parse_error(err),
55            RunError::TimedOut => {
56                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
57            }
58        })
59    }
60}
61
62#[derive(Clone)]
63pub struct Manager {
64    pub endpoint: String,
65    pub root: String,
66    pub user: String,
67    pub password: String,
68    pub enable_secure: bool,
69}
70
71#[async_trait::async_trait]
72impl bb8::ManageConnection for Manager {
73    type Connection = AsyncRustlsFtpStream;
74    type Error = FtpError;
75
76    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
77        let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
78        // switch to secure mode if ssl/tls is on.
79        let mut ftp_stream = if self.enable_secure {
80            let mut root_store = suppaftp::rustls::RootCertStore::empty();
81            for cert in
82                rustls_native_certs::load_native_certs().expect("could not load platform certs")
83            {
84                root_store.add(cert).unwrap();
85            }
86
87            let cfg = ClientConfig::builder()
88                .with_root_certificates(root_store)
89                .with_no_client_auth();
90            stream
91                .into_secure(
92                    AsyncRustlsConnector::from(TlsConnector::from(Arc::new(cfg))),
93                    &self.endpoint,
94                )
95                .await?
96        } else {
97            stream
98        };
99
100        // login if needed
101        if !self.user.is_empty() {
102            ftp_stream.login(&self.user, &self.password).await?;
103        }
104
105        // change to the root path
106        match ftp_stream.cwd(&self.root).await {
107            Err(FtpError::UnexpectedResponse(e)) if e.status == Status::FileUnavailable => {
108                ftp_stream.mkdir(&self.root).await?;
109                // Then change to root path
110                ftp_stream.cwd(&self.root).await?;
111            }
112            // Other errors, return.
113            Err(e) => return Err(e),
114            // Do nothing if success.
115            Ok(_) => (),
116        }
117
118        ftp_stream.transfer_type(FileType::Binary).await?;
119
120        Ok(ftp_stream)
121    }
122
123    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
124        conn.noop().await
125    }
126
127    /// Don't allow reuse conn.
128    ///
129    /// We need to investigate why reuse conn will cause error.
130    fn has_broken(&self, _: &mut Self::Connection) -> bool {
131        true
132    }
133}