opendal/services/sftp/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use bb8::PooledConnection;
25use bb8::RunError;
26use log::debug;
27use openssh::KnownHosts;
28use openssh::SessionBuilder;
29use openssh_sftp_client::Sftp;
30use openssh_sftp_client::SftpOptions;
31use tokio::sync::OnceCell;
32
33use super::error::is_sftp_protocol_error;
34use super::error::parse_sftp_error;
35use super::error::parse_ssh_error;
36use crate::raw::*;
37use crate::*;
38
39pub struct SftpCore {
40    pub info: Arc<AccessorInfo>,
41    pub endpoint: String,
42    pub root: String,
43    pub user: Option<String>,
44    pub key: Option<String>,
45    pub known_hosts_strategy: KnownHosts,
46
47    pub client: OnceCell<bb8::Pool<Manager>>,
48}
49
50impl Debug for SftpCore {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("SftpCore")
53            .field("endpoint", &self.endpoint)
54            .field("root", &self.root)
55            .finish()
56    }
57}
58
59impl SftpCore {
60    pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> {
61        let client = self
62            .client
63            .get_or_try_init(|| async {
64                bb8::Pool::builder()
65                    .max_size(64)
66                    .build(Manager {
67                        endpoint: self.endpoint.clone(),
68                        root: self.root.clone(),
69                        user: self.user.clone(),
70                        key: self.key.clone(),
71                        known_hosts_strategy: self.known_hosts_strategy.clone(),
72                    })
73                    .await
74            })
75            .await?;
76
77        client.get_owned().await.map_err(|err| match err {
78            RunError::User(err) => err,
79            RunError::TimedOut => {
80                Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
81            }
82        })
83    }
84}
85
86pub struct Manager {
87    endpoint: String,
88    root: String,
89    user: Option<String>,
90    key: Option<String>,
91    known_hosts_strategy: KnownHosts,
92}
93
94#[async_trait::async_trait]
95impl bb8::ManageConnection for Manager {
96    type Connection = Sftp;
97    type Error = Error;
98
99    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
100        let mut session = SessionBuilder::default();
101
102        if let Some(user) = &self.user {
103            session.user(user.clone());
104        }
105
106        if let Some(key) = &self.key {
107            session.keyfile(key);
108        }
109
110        session.known_hosts_check(self.known_hosts_strategy.clone());
111
112        let session = session
113            .connect(&self.endpoint)
114            .await
115            .map_err(parse_ssh_error)?;
116
117        let sftp = Sftp::from_session(session, SftpOptions::default())
118            .await
119            .map_err(parse_sftp_error)?;
120
121        if !self.root.is_empty() {
122            let mut fs = sftp.fs();
123
124            let paths = Path::new(&self.root).components();
125            let mut current = PathBuf::new();
126            for p in paths {
127                current.push(p);
128                let res = fs.create_dir(p).await;
129
130                if let Err(e) = res {
131                    // ignore error if dir already exists
132                    if !is_sftp_protocol_error(&e) {
133                        return Err(parse_sftp_error(e));
134                    }
135                }
136                fs.set_cwd(&current);
137            }
138        }
139
140        debug!("sftp connection created at {}", self.root);
141        Ok(sftp)
142    }
143
144    // Check if connect valid by checking the root path.
145    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
146        let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?;
147
148        Ok(())
149    }
150
151    /// Always allow reuse conn.
152    fn has_broken(&self, _: &mut Self::Connection) -> bool {
153        false
154    }
155}