opendal_core/services/sftp/
core.rs1use super::error::is_sftp_protocol_error;
19use super::error::parse_sftp_error;
20use super::error::parse_ssh_error;
21use crate::raw::*;
22use crate::*;
23use fastpool::{ManageObject, ObjectStatus, bounded};
24use log::debug;
25use openssh::KnownHosts;
26use openssh::SessionBuilder;
27use openssh_sftp_client::Sftp;
28use openssh_sftp_client::SftpOptions;
29use std::fmt::Debug;
30use std::path::Path;
31use std::path::PathBuf;
32use std::sync::Arc;
33
34pub struct SftpCore {
35 pub info: Arc<AccessorInfo>,
36 pub endpoint: String,
37 pub root: String,
38 client: Arc<bounded::Pool<Manager>>,
39}
40
41impl Debug for SftpCore {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("SftpCore")
44 .field("endpoint", &self.endpoint)
45 .field("root", &self.root)
46 .finish_non_exhaustive()
47 }
48}
49
50impl SftpCore {
51 pub fn new(
52 info: Arc<AccessorInfo>,
53 endpoint: String,
54 root: String,
55 user: Option<String>,
56 key: Option<String>,
57 known_hosts_strategy: KnownHosts,
58 ) -> Self {
59 let client = bounded::Pool::new(
60 bounded::PoolConfig::new(64),
61 Manager {
62 endpoint: endpoint.clone(),
63 root: root.clone(),
64 user,
65 key,
66 known_hosts_strategy,
67 },
68 );
69
70 SftpCore {
71 info,
72 endpoint,
73 root,
74 client,
75 }
76 }
77
78 pub async fn connect(&self) -> Result<bounded::Object<Manager>> {
79 let fut = self.client.get();
80
81 tokio::select! {
82 _ = tokio::time::sleep(Duration::from_secs(10)) => {
83 Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary())
84 }
85 result = fut => match result {
86 Ok(conn) => Ok(conn),
87 Err(err) => Err(err),
88 }
89 }
90 }
91}
92
93pub struct Manager {
94 endpoint: String,
95 root: String,
96 user: Option<String>,
97 key: Option<String>,
98 known_hosts_strategy: KnownHosts,
99}
100
101impl ManageObject for Manager {
102 type Object = Sftp;
103 type Error = Error;
104
105 async fn create(&self) -> Result<Self::Object, Self::Error> {
106 let mut session = SessionBuilder::default();
107
108 if let Some(user) = &self.user {
109 session.user(user.clone());
110 }
111
112 if let Some(key) = &self.key {
113 session.keyfile(key);
114 }
115
116 session.known_hosts_check(self.known_hosts_strategy.clone());
117
118 let session = session
119 .connect(&self.endpoint)
120 .await
121 .map_err(parse_ssh_error)?;
122
123 let sftp = Sftp::from_session(session, SftpOptions::default())
124 .await
125 .map_err(parse_sftp_error)?;
126
127 if !self.root.is_empty() {
128 let mut fs = sftp.fs();
129
130 let paths = Path::new(&self.root).components();
131 let mut current = PathBuf::new();
132 for p in paths {
133 current.push(p);
134 let res = fs.create_dir(p).await;
135
136 if let Err(e) = res {
137 if !is_sftp_protocol_error(&e) {
139 return Err(parse_sftp_error(e));
140 }
141 }
142 fs.set_cwd(¤t);
143 }
144 }
145
146 debug!("sftp connection created at {}", self.root);
147 Ok(sftp)
148 }
149
150 async fn is_recyclable(
152 &self,
153 o: &mut Self::Object,
154 _: &ObjectStatus,
155 ) -> Result<(), Self::Error> {
156 match o.fs().metadata("./").await {
157 Ok(_) => Ok(()),
158 Err(e) => Err(parse_sftp_error(e)),
159 }
160 }
161}