opendal/services/sftp/
backend.rs1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use log::debug;
24use openssh::KnownHosts;
25use tokio::io::AsyncSeekExt;
26use tokio::sync::OnceCell;
27
28use super::SFTP_SCHEME;
29use super::config::SftpConfig;
30use super::core::SftpCore;
31use super::deleter::SftpDeleter;
32use super::error::is_not_found;
33use super::error::is_sftp_protocol_error;
34use super::error::parse_sftp_error;
35use super::lister::SftpLister;
36use super::reader::SftpReader;
37use super::writer::SftpWriter;
38use crate::raw::*;
39use crate::*;
40
41#[doc = include_str!("docs.md")]
51#[derive(Debug, Default)]
52pub struct SftpBuilder {
53 pub(super) config: SftpConfig,
54}
55
56impl SftpBuilder {
57 pub fn endpoint(mut self, endpoint: &str) -> Self {
60 self.config.endpoint = if endpoint.is_empty() {
61 None
62 } else {
63 Some(endpoint.to_string())
64 };
65
66 self
67 }
68
69 pub fn root(mut self, root: &str) -> Self {
72 self.config.root = if root.is_empty() {
73 None
74 } else {
75 Some(root.to_string())
76 };
77
78 self
79 }
80
81 pub fn user(mut self, user: &str) -> Self {
83 self.config.user = if user.is_empty() {
84 None
85 } else {
86 Some(user.to_string())
87 };
88
89 self
90 }
91
92 pub fn key(mut self, key: &str) -> Self {
94 self.config.key = if key.is_empty() {
95 None
96 } else {
97 Some(key.to_string())
98 };
99
100 self
101 }
102
103 pub fn known_hosts_strategy(mut self, strategy: &str) -> Self {
109 self.config.known_hosts_strategy = if strategy.is_empty() {
110 None
111 } else {
112 Some(strategy.to_string())
113 };
114
115 self
116 }
117
118 pub fn enable_copy(mut self, enable_copy: bool) -> Self {
121 self.config.enable_copy = enable_copy;
122
123 self
124 }
125}
126
127impl Builder for SftpBuilder {
128 type Config = SftpConfig;
129
130 fn build(self) -> Result<impl Access> {
131 debug!("sftp backend build started: {:?}", &self);
132 let endpoint = match self.config.endpoint.clone() {
133 Some(v) => v,
134 None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")),
135 };
136
137 let user = self.config.user.clone();
138
139 let root = self
140 .config
141 .root
142 .clone()
143 .map(|r| normalize_root(r.as_str()))
144 .unwrap_or_default();
145
146 let known_hosts_strategy = match &self.config.known_hosts_strategy {
147 Some(v) => {
148 let v = v.to_lowercase();
149 if v == "strict" {
150 KnownHosts::Strict
151 } else if v == "accept" {
152 KnownHosts::Accept
153 } else if v == "add" {
154 KnownHosts::Add
155 } else {
156 return Err(Error::new(
157 ErrorKind::ConfigInvalid,
158 format!("unknown known_hosts strategy: {v}").as_str(),
159 ));
160 }
161 }
162 None => KnownHosts::Strict,
163 };
164
165 let info = AccessorInfo::default();
166 info.set_root(root.as_str())
167 .set_scheme(SFTP_SCHEME)
168 .set_native_capability(Capability {
169 stat: true,
170
171 read: true,
172
173 write: true,
174 write_can_multi: true,
175
176 create_dir: true,
177 delete: true,
178
179 list: true,
180 list_with_limit: true,
181
182 copy: self.config.enable_copy,
183 rename: true,
184
185 shared: true,
186
187 ..Default::default()
188 });
189
190 let accessor_info = Arc::new(info);
191 let core = Arc::new(SftpCore {
192 info: accessor_info,
193 endpoint,
194 root,
195 user,
196 key: self.config.key.clone(),
197 known_hosts_strategy,
198
199 client: OnceCell::new(),
200 });
201
202 debug!("sftp backend finished: {:?}", &self);
203 Ok(SftpBackend { core })
204 }
205}
206
207#[derive(Clone, Debug)]
209pub struct SftpBackend {
210 pub core: Arc<SftpCore>,
211}
212
213impl Access for SftpBackend {
214 type Reader = SftpReader;
215 type Writer = SftpWriter;
216 type Lister = Option<SftpLister>;
217 type Deleter = oio::OneShotDeleter<SftpDeleter>;
218
219 fn info(&self) -> Arc<AccessorInfo> {
220 self.core.info.clone()
221 }
222
223 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
224 let client = self.core.connect().await?;
225 let mut fs = client.fs();
226 fs.set_cwd(&self.core.root);
227
228 let paths = Path::new(&path).components();
229 let mut current = PathBuf::from(&self.core.root);
230 for p in paths {
231 current = current.join(p);
232 let res = fs.create_dir(p).await;
233
234 if let Err(e) = res {
235 if !is_sftp_protocol_error(&e) {
237 return Err(parse_sftp_error(e));
238 }
239 }
240 fs.set_cwd(¤t);
241 }
242
243 Ok(RpCreateDir::default())
244 }
245
246 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
247 let client = self.core.connect().await?;
248 let mut fs = client.fs();
249 fs.set_cwd(&self.core.root);
250
251 let meta: Metadata = fs.metadata(path).await.map_err(parse_sftp_error)?.into();
252
253 Ok(RpStat::new(meta))
254 }
255
256 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
257 let client = self.core.connect().await?;
258
259 let mut fs = client.fs();
260 fs.set_cwd(&self.core.root);
261
262 let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;
263
264 let mut f = client
265 .open(path.as_path())
266 .await
267 .map_err(parse_sftp_error)?;
268
269 if args.range().offset() != 0 {
270 f.seek(SeekFrom::Start(args.range().offset()))
271 .await
272 .map_err(new_std_io_error)?;
273 }
274
275 Ok((
276 RpRead::default(),
277 SftpReader::new(client, f, args.range().size()),
278 ))
279 }
280
281 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
282 if let Some((dir, _)) = path.rsplit_once('/') {
283 self.create_dir(dir, OpCreateDir::default()).await?;
284 }
285
286 let client = self.core.connect().await?;
287
288 let mut fs = client.fs();
289 fs.set_cwd(&self.core.root);
290 let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;
291
292 let mut option = client.options();
293 option.create(true);
294 if op.append() {
295 option.append(true);
296 } else {
297 option.write(true).truncate(true);
298 }
299
300 let file = option.open(path).await.map_err(parse_sftp_error)?;
301
302 Ok((RpWrite::new(), SftpWriter::new(file)))
303 }
304
305 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
306 Ok((
307 RpDelete::default(),
308 oio::OneShotDeleter::new(SftpDeleter::new(self.core.clone())),
309 ))
310 }
311
312 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
313 let client = self.core.connect().await?;
314 let mut fs = client.fs();
315 fs.set_cwd(&self.core.root);
316
317 let file_path = format!("./{path}");
318
319 let dir = match fs.open_dir(&file_path).await {
320 Ok(dir) => dir,
321 Err(e) => {
322 if is_not_found(&e) {
323 return Ok((RpList::default(), None));
324 } else {
325 return Err(parse_sftp_error(e));
326 }
327 }
328 }
329 .read_dir();
330
331 Ok((
332 RpList::default(),
333 Some(SftpLister::new(dir, path.to_owned())),
334 ))
335 }
336
337 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
338 let client = self.core.connect().await?;
339
340 let mut fs = client.fs();
341 fs.set_cwd(&self.core.root);
342
343 if let Some((dir, _)) = to.rsplit_once('/') {
344 self.create_dir(dir, OpCreateDir::default()).await?;
345 }
346
347 let src = fs.canonicalize(from).await.map_err(parse_sftp_error)?;
348 let dst = fs.canonicalize(to).await.map_err(parse_sftp_error)?;
349 let mut src_file = client.open(&src).await.map_err(parse_sftp_error)?;
350 let mut dst_file = client.create(dst).await.map_err(parse_sftp_error)?;
351
352 src_file
353 .copy_all_to(&mut dst_file)
354 .await
355 .map_err(parse_sftp_error)?;
356
357 Ok(RpCopy::default())
358 }
359
360 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
361 let client = self.core.connect().await?;
362
363 let mut fs = client.fs();
364 fs.set_cwd(&self.core.root);
365
366 if let Some((dir, _)) = to.rsplit_once('/') {
367 self.create_dir(dir, OpCreateDir::default()).await?;
368 }
369 fs.rename(from, to).await.map_err(parse_sftp_error)?;
370
371 Ok(RpRename::default())
372 }
373}