opendal/services/sftp/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use bb8::PooledConnection;
25use bb8::RunError;
26use log::debug;
27use openssh::KnownHosts;
28use openssh::SessionBuilder;
29use openssh_sftp_client::Sftp;
30use openssh_sftp_client::SftpOptions;
31use tokio::sync::OnceCell;
32
33use super::error::is_sftp_protocol_error;
34use super::error::parse_sftp_error;
35use super::error::parse_ssh_error;
36use crate::raw::*;
37use crate::*;
38
39pub struct SftpCore {
40 pub info: Arc<AccessorInfo>,
41 pub endpoint: String,
42 pub root: String,
43 pub user: Option<String>,
44 pub key: Option<String>,
45 pub known_hosts_strategy: KnownHosts,
46
47 pub client: OnceCell<bb8::Pool<Manager>>,
48}
49
50impl Debug for SftpCore {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("SftpCore")
53 .field("endpoint", &self.endpoint)
54 .field("root", &self.root)
55 .finish()
56 }
57}
58
59impl SftpCore {
60 pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> {
61 let client = self
62 .client
63 .get_or_try_init(|| async {
64 bb8::Pool::builder()
65 .max_size(64)
66 .build(Manager {
67 endpoint: self.endpoint.clone(),
68 root: self.root.clone(),
69 user: self.user.clone(),
70 key: self.key.clone(),
71 known_hosts_strategy: self.known_hosts_strategy.clone(),
72 })
73 .await
74 })
75 .await?;
76
77 client.get_owned().await.map_err(|err| match err {
78 RunError::User(err) => err,
79 RunError::TimedOut => {
80 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
81 }
82 })
83 }
84}
85
86pub struct Manager {
87 endpoint: String,
88 root: String,
89 user: Option<String>,
90 key: Option<String>,
91 known_hosts_strategy: KnownHosts,
92}
93
94#[async_trait::async_trait]
95impl bb8::ManageConnection for Manager {
96 type Connection = Sftp;
97 type Error = Error;
98
99 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
100 let mut session = SessionBuilder::default();
101
102 if let Some(user) = &self.user {
103 session.user(user.clone());
104 }
105
106 if let Some(key) = &self.key {
107 session.keyfile(key);
108 }
109
110 session.known_hosts_check(self.known_hosts_strategy.clone());
111
112 let session = session
113 .connect(&self.endpoint)
114 .await
115 .map_err(parse_ssh_error)?;
116
117 let sftp = Sftp::from_session(session, SftpOptions::default())
118 .await
119 .map_err(parse_sftp_error)?;
120
121 if !self.root.is_empty() {
122 let mut fs = sftp.fs();
123
124 let paths = Path::new(&self.root).components();
125 let mut current = PathBuf::new();
126 for p in paths {
127 current.push(p);
128 let res = fs.create_dir(p).await;
129
130 if let Err(e) = res {
131 if !is_sftp_protocol_error(&e) {
133 return Err(parse_sftp_error(e));
134 }
135 }
136 fs.set_cwd(¤t);
137 }
138 }
139
140 debug!("sftp connection created at {}", self.root);
141 Ok(sftp)
142 }
143
144 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
146 let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?;
147
148 Ok(())
149 }
150
151 fn has_broken(&self, _: &mut Self::Connection) -> bool {
153 false
154 }
155}