opendal_core/services/ftp/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use fastpool::{ManageObject, ObjectStatus, bounded};
19use futures_rustls::TlsConnector;
20use std::sync::Arc;
21use suppaftp::AsyncRustlsConnector;
22use suppaftp::AsyncRustlsFtpStream;
23use suppaftp::FtpError;
24use suppaftp::ImplAsyncFtpStream;
25use suppaftp::Status;
26use suppaftp::rustls::ClientConfig;
27use suppaftp::types::FileType;
28
29use super::err::format_ftp_error;
30use crate::raw::*;
31use crate::*;
32
33pub struct FtpCore {
34    info: Arc<AccessorInfo>,
35    pool: Arc<bounded::Pool<Manager>>,
36}
37
38impl FtpCore {
39    pub fn new(info: Arc<AccessorInfo>, manager: Manager) -> Self {
40        let pool = bounded::Pool::new(bounded::PoolConfig::new(64), manager);
41        Self { info, pool }
42    }
43
44    pub fn info(&self) -> Arc<AccessorInfo> {
45        self.info.clone()
46    }
47
48    pub async fn ftp_connect(&self, _: Operation) -> Result<bounded::Object<Manager>> {
49        let fut = self.pool.get();
50
51        tokio::select! {
52            _ = tokio::time::sleep(Duration::from_secs(10)) => {
53                Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
54            }
55            result = fut => match result {
56                Ok(conn) => Ok(conn),
57                Err(err) => Err(format_ftp_error(err)),
58            }
59        }
60    }
61}
62
63#[derive(Clone)]
64pub struct Manager {
65    pub endpoint: String,
66    pub root: String,
67    pub user: String,
68    pub password: String,
69    pub enable_secure: bool,
70}
71
72impl ManageObject for Manager {
73    type Object = AsyncRustlsFtpStream;
74    type Error = FtpError;
75
76    async fn create(&self) -> Result<Self::Object, Self::Error> {
77        let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
78        // switch to secure mode if ssl/tls is on.
79        let mut ftp_stream = if self.enable_secure {
80            let mut root_store = suppaftp::rustls::RootCertStore::empty();
81            for cert in
82                rustls_native_certs::load_native_certs().expect("could not load platform certs")
83            {
84                root_store.add(cert).unwrap();
85            }
86
87            let cfg = ClientConfig::builder()
88                .with_root_certificates(root_store)
89                .with_no_client_auth();
90            stream
91                .into_secure(
92                    AsyncRustlsConnector::from(TlsConnector::from(Arc::new(cfg))),
93                    &self.endpoint,
94                )
95                .await?
96        } else {
97            stream
98        };
99
100        // login if needed
101        if !self.user.is_empty() {
102            ftp_stream.login(&self.user, &self.password).await?;
103        }
104
105        // change to the root path
106        match ftp_stream.cwd(&self.root).await {
107            Err(FtpError::UnexpectedResponse(e)) if e.status == Status::FileUnavailable => {
108                ftp_stream.mkdir(&self.root).await?;
109                // Then change to root path
110                ftp_stream.cwd(&self.root).await?;
111            }
112            // Other errors, return.
113            Err(e) => return Err(e),
114            // Do nothing if success.
115            Ok(_) => (),
116        }
117
118        ftp_stream.transfer_type(FileType::Binary).await?;
119
120        Ok(ftp_stream)
121    }
122
123    async fn is_recyclable(
124        &self,
125        o: &mut Self::Object,
126        _: &ObjectStatus,
127    ) -> Result<(), Self::Error> {
128        o.noop().await
129    }
130}