opendal/services/sftp/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use bb8::PooledConnection;
25use bb8::RunError;
26use log::debug;
27use openssh::KnownHosts;
28use openssh::SessionBuilder;
29use openssh_sftp_client::Sftp;
30use openssh_sftp_client::SftpOptions;
31use tokio::sync::OnceCell;
32
33use super::error::is_sftp_protocol_error;
34use super::error::parse_sftp_error;
35use super::error::parse_ssh_error;
36use crate::raw::*;
37use crate::*;
38
39pub struct SftpCore {
40 pub info: Arc<AccessorInfo>,
41 pub endpoint: String,
42 pub root: String,
43 pub user: Option<String>,
44 pub key: Option<String>,
45 pub known_hosts_strategy: KnownHosts,
46
47 pub client: OnceCell<bb8::Pool<Manager>>,
48}
49
50impl Debug for SftpCore {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("SftpCore")
53 .field("endpoint", &self.endpoint)
54 .field("root", &self.root)
55 .finish()
56 }
57}
58
59impl SftpCore {
60 pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> {
61 let client = self
62 .client
63 .get_or_try_init(|| async {
64 bb8::Pool::builder()
65 .max_size(64)
66 .build(Manager {
67 endpoint: self.endpoint.clone(),
68 root: self.root.clone(),
69 user: self.user.clone(),
70 key: self.key.clone(),
71 known_hosts_strategy: self.known_hosts_strategy.clone(),
72 })
73 .await
74 })
75 .await?;
76
77 client.get_owned().await.map_err(|err| match err {
78 RunError::User(err) => err,
79 RunError::TimedOut => {
80 Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()
81 }
82 })
83 }
84}
85
86pub struct Manager {
87 endpoint: String,
88 root: String,
89 user: Option<String>,
90 key: Option<String>,
91 known_hosts_strategy: KnownHosts,
92}
93
94impl bb8::ManageConnection for Manager {
95 type Connection = Sftp;
96 type Error = Error;
97
98 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
99 let mut session = SessionBuilder::default();
100
101 if let Some(user) = &self.user {
102 session.user(user.clone());
103 }
104
105 if let Some(key) = &self.key {
106 session.keyfile(key);
107 }
108
109 session.known_hosts_check(self.known_hosts_strategy.clone());
110
111 let session = session
112 .connect(&self.endpoint)
113 .await
114 .map_err(parse_ssh_error)?;
115
116 let sftp = Sftp::from_session(session, SftpOptions::default())
117 .await
118 .map_err(parse_sftp_error)?;
119
120 if !self.root.is_empty() {
121 let mut fs = sftp.fs();
122
123 let paths = Path::new(&self.root).components();
124 let mut current = PathBuf::new();
125 for p in paths {
126 current.push(p);
127 let res = fs.create_dir(p).await;
128
129 if let Err(e) = res {
130 if !is_sftp_protocol_error(&e) {
132 return Err(parse_sftp_error(e));
133 }
134 }
135 fs.set_cwd(¤t);
136 }
137 }
138
139 debug!("sftp connection created at {}", self.root);
140 Ok(sftp)
141 }
142
143 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
145 let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?;
146
147 Ok(())
148 }
149
150 fn has_broken(&self, _: &mut Self::Connection) -> bool {
152 false
153 }
154}