opendal/services/sftp/
backend.rs1use std::io::SeekFrom;
19use std::path::Path;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use log::debug;
24use openssh::KnownHosts;
25use tokio::io::AsyncSeekExt;
26
27use super::SFTP_SCHEME;
28use super::config::SftpConfig;
29use super::core::SftpCore;
30use super::deleter::SftpDeleter;
31use super::error::is_not_found;
32use super::error::is_sftp_protocol_error;
33use super::error::parse_sftp_error;
34use super::lister::SftpLister;
35use super::reader::SftpReader;
36use super::writer::SftpWriter;
37use crate::raw::*;
38use crate::*;
39
40#[doc = include_str!("docs.md")]
50#[derive(Debug, Default)]
51pub struct SftpBuilder {
52 pub(super) config: SftpConfig,
53}
54
55impl SftpBuilder {
56 pub fn endpoint(mut self, endpoint: &str) -> Self {
59 self.config.endpoint = if endpoint.is_empty() {
60 None
61 } else {
62 Some(endpoint.to_string())
63 };
64
65 self
66 }
67
68 pub fn root(mut self, root: &str) -> Self {
71 self.config.root = if root.is_empty() {
72 None
73 } else {
74 Some(root.to_string())
75 };
76
77 self
78 }
79
80 pub fn user(mut self, user: &str) -> Self {
82 self.config.user = if user.is_empty() {
83 None
84 } else {
85 Some(user.to_string())
86 };
87
88 self
89 }
90
91 pub fn key(mut self, key: &str) -> Self {
93 self.config.key = if key.is_empty() {
94 None
95 } else {
96 Some(key.to_string())
97 };
98
99 self
100 }
101
102 pub fn known_hosts_strategy(mut self, strategy: &str) -> Self {
108 self.config.known_hosts_strategy = if strategy.is_empty() {
109 None
110 } else {
111 Some(strategy.to_string())
112 };
113
114 self
115 }
116
117 pub fn enable_copy(mut self, enable_copy: bool) -> Self {
120 self.config.enable_copy = enable_copy;
121
122 self
123 }
124}
125
126impl Builder for SftpBuilder {
127 type Config = SftpConfig;
128
129 fn build(self) -> Result<impl Access> {
130 debug!("sftp backend build started: {:?}", &self);
131 let endpoint = match self.config.endpoint.clone() {
132 Some(v) => v,
133 None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")),
134 };
135
136 let user = self.config.user.clone();
137
138 let root = self
139 .config
140 .root
141 .clone()
142 .map(|r| normalize_root(r.as_str()))
143 .unwrap_or_default();
144
145 let known_hosts_strategy = match &self.config.known_hosts_strategy {
146 Some(v) => {
147 let v = v.to_lowercase();
148 if v == "strict" {
149 KnownHosts::Strict
150 } else if v == "accept" {
151 KnownHosts::Accept
152 } else if v == "add" {
153 KnownHosts::Add
154 } else {
155 return Err(Error::new(
156 ErrorKind::ConfigInvalid,
157 format!("unknown known_hosts strategy: {v}").as_str(),
158 ));
159 }
160 }
161 None => KnownHosts::Strict,
162 };
163
164 let info = AccessorInfo::default();
165 info.set_root(root.as_str())
166 .set_scheme(SFTP_SCHEME)
167 .set_native_capability(Capability {
168 stat: true,
169
170 read: true,
171
172 write: true,
173 write_can_multi: true,
174
175 create_dir: true,
176 delete: true,
177
178 list: true,
179 list_with_limit: true,
180
181 copy: self.config.enable_copy,
182 rename: true,
183
184 shared: true,
185
186 ..Default::default()
187 });
188
189 let core = Arc::new(SftpCore::new(
190 Arc::new(info),
191 endpoint,
192 root,
193 user,
194 self.config.key.clone(),
195 known_hosts_strategy,
196 ));
197
198 debug!("sftp backend finished: {:?}", &self);
199 Ok(SftpBackend { core })
200 }
201}
202
203#[derive(Clone, Debug)]
205pub struct SftpBackend {
206 pub core: Arc<SftpCore>,
207}
208
209impl Access for SftpBackend {
210 type Reader = SftpReader;
211 type Writer = SftpWriter;
212 type Lister = Option<SftpLister>;
213 type Deleter = oio::OneShotDeleter<SftpDeleter>;
214
215 fn info(&self) -> Arc<AccessorInfo> {
216 self.core.info.clone()
217 }
218
219 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
220 let client = self.core.connect().await?;
221 let mut fs = client.fs();
222 fs.set_cwd(&self.core.root);
223
224 let paths = Path::new(&path).components();
225 let mut current = PathBuf::from(&self.core.root);
226 for p in paths {
227 current = current.join(p);
228 let res = fs.create_dir(p).await;
229
230 if let Err(e) = res {
231 if !is_sftp_protocol_error(&e) {
233 return Err(parse_sftp_error(e));
234 }
235 }
236 fs.set_cwd(¤t);
237 }
238
239 Ok(RpCreateDir::default())
240 }
241
242 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
243 let client = self.core.connect().await?;
244 let mut fs = client.fs();
245 fs.set_cwd(&self.core.root);
246
247 let meta: Metadata = fs.metadata(path).await.map_err(parse_sftp_error)?.into();
248
249 Ok(RpStat::new(meta))
250 }
251
252 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
253 let client = self.core.connect().await?;
254
255 let mut fs = client.fs();
256 fs.set_cwd(&self.core.root);
257
258 let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;
259
260 let mut f = client
261 .open(path.as_path())
262 .await
263 .map_err(parse_sftp_error)?;
264
265 if args.range().offset() != 0 {
266 f.seek(SeekFrom::Start(args.range().offset()))
267 .await
268 .map_err(new_std_io_error)?;
269 }
270
271 Ok((
272 RpRead::default(),
273 SftpReader::new(client, f, args.range().size()),
274 ))
275 }
276
277 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
278 if let Some((dir, _)) = path.rsplit_once('/') {
279 self.create_dir(dir, OpCreateDir::default()).await?;
280 }
281
282 let client = self.core.connect().await?;
283
284 let mut fs = client.fs();
285 fs.set_cwd(&self.core.root);
286 let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;
287
288 let mut option = client.options();
289 option.create(true);
290 if op.append() {
291 option.append(true);
292 } else {
293 option.write(true).truncate(true);
294 }
295
296 let file = option.open(path).await.map_err(parse_sftp_error)?;
297
298 Ok((RpWrite::new(), SftpWriter::new(file)))
299 }
300
301 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
302 Ok((
303 RpDelete::default(),
304 oio::OneShotDeleter::new(SftpDeleter::new(self.core.clone())),
305 ))
306 }
307
308 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
309 let client = self.core.connect().await?;
310 let mut fs = client.fs();
311 fs.set_cwd(&self.core.root);
312
313 let file_path = format!("./{path}");
314
315 let dir = match fs.open_dir(&file_path).await {
316 Ok(dir) => dir,
317 Err(e) => {
318 return if is_not_found(&e) {
319 Ok((RpList::default(), None))
320 } else {
321 Err(parse_sftp_error(e))
322 };
323 }
324 }
325 .read_dir();
326
327 Ok((
328 RpList::default(),
329 Some(SftpLister::new(dir, path.to_owned())),
330 ))
331 }
332
333 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
334 let client = self.core.connect().await?;
335
336 let mut fs = client.fs();
337 fs.set_cwd(&self.core.root);
338
339 if let Some((dir, _)) = to.rsplit_once('/') {
340 self.create_dir(dir, OpCreateDir::default()).await?;
341 }
342
343 let src = fs.canonicalize(from).await.map_err(parse_sftp_error)?;
344 let dst = fs.canonicalize(to).await.map_err(parse_sftp_error)?;
345 let mut src_file = client.open(&src).await.map_err(parse_sftp_error)?;
346 let mut dst_file = client.create(dst).await.map_err(parse_sftp_error)?;
347
348 src_file
349 .copy_all_to(&mut dst_file)
350 .await
351 .map_err(parse_sftp_error)?;
352
353 Ok(RpCopy::default())
354 }
355
356 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
357 let client = self.core.connect().await?;
358
359 let mut fs = client.fs();
360 fs.set_cwd(&self.core.root);
361
362 if let Some((dir, _)) = to.rsplit_once('/') {
363 self.create_dir(dir, OpCreateDir::default()).await?;
364 }
365 fs.rename(from, to).await.map_err(parse_sftp_error)?;
366
367 Ok(RpRename::default())
368 }
369}