1use std::fmt::Debug;
19use std::str::FromStr;
20use std::sync::Arc;
21
22use http::Uri;
23use log::debug;
24use services::ftp::core::Manager;
25use suppaftp::FtpError;
26use suppaftp::Status;
27use suppaftp::list::File;
28use suppaftp::types::Response;
29
30use super::FTP_SCHEME;
31use super::config::FtpConfig;
32use super::core::FtpCore;
33use super::deleter::FtpDeleter;
34use super::err::format_ftp_error;
35use super::lister::FtpLister;
36use super::reader::FtpReader;
37use super::writer::FtpWriter;
38use crate::raw::*;
39use crate::*;
40
41#[doc = include_str!("docs.md")]
43#[derive(Debug, Default)]
44pub struct FtpBuilder {
45 pub(super) config: FtpConfig,
46}
47
48impl FtpBuilder {
49 pub fn endpoint(mut self, endpoint: &str) -> Self {
51 self.config.endpoint = if endpoint.is_empty() {
52 None
53 } else {
54 Some(endpoint.to_string())
55 };
56
57 self
58 }
59
60 pub fn root(mut self, root: &str) -> Self {
62 self.config.root = if root.is_empty() {
63 None
64 } else {
65 Some(root.to_string())
66 };
67
68 self
69 }
70
71 pub fn user(mut self, user: &str) -> Self {
73 self.config.user = if user.is_empty() {
74 None
75 } else {
76 Some(user.to_string())
77 };
78
79 self
80 }
81
82 pub fn password(mut self, password: &str) -> Self {
84 self.config.password = if password.is_empty() {
85 None
86 } else {
87 Some(password.to_string())
88 };
89
90 self
91 }
92}
93
94impl Builder for FtpBuilder {
95 type Config = FtpConfig;
96
97 fn build(self) -> Result<impl Access> {
98 debug!("ftp backend build started: {:?}", &self);
99 let endpoint = match &self.config.endpoint {
100 None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")),
101 Some(v) => v,
102 };
103
104 let endpoint_uri = match endpoint.parse::<Uri>() {
105 Err(e) => {
106 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
107 .with_context("endpoint", endpoint)
108 .set_source(e));
109 }
110 Ok(uri) => uri,
111 };
112
113 let host = endpoint_uri.host().unwrap_or("127.0.0.1");
114 let port = endpoint_uri.port_u16().unwrap_or(21);
115
116 let endpoint = format!("{host}:{port}");
117
118 let enable_secure = match endpoint_uri.scheme_str() {
119 Some("ftp") => false,
120 Some("ftps") | None => true,
123
124 Some(s) => {
125 return Err(Error::new(
126 ErrorKind::ConfigInvalid,
127 "endpoint is unsupported or invalid",
128 )
129 .with_context("endpoint", s));
130 }
131 };
132
133 let root = normalize_root(&self.config.root.unwrap_or_default());
134
135 let user = match &self.config.user {
136 None => "".to_string(),
137 Some(v) => v.clone(),
138 };
139
140 let password = match &self.config.password {
141 None => "".to_string(),
142 Some(v) => v.clone(),
143 };
144
145 let accessor_info = AccessorInfo::default();
146 accessor_info
147 .set_scheme(FTP_SCHEME)
148 .set_root(&root)
149 .set_native_capability(Capability {
150 stat: true,
151
152 read: true,
153
154 write: true,
155 write_can_multi: true,
156 write_can_append: true,
157
158 delete: true,
159 create_dir: true,
160
161 list: true,
162
163 shared: true,
164
165 ..Default::default()
166 });
167
168 let manager = Manager {
169 endpoint: endpoint.clone(),
170 root: root.clone(),
171 user: user.clone(),
172 password: password.clone(),
173 enable_secure,
174 };
175
176 let core = Arc::new(FtpCore::new(accessor_info.into(), manager.clone()));
177 Ok(FtpBackend { core })
178 }
179}
180
181#[derive(Clone)]
183pub struct FtpBackend {
184 core: Arc<FtpCore>,
185}
186
187impl Debug for FtpBackend {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 f.debug_struct("FtpBackend").finish()
190 }
191}
192
193impl Access for FtpBackend {
194 type Reader = FtpReader;
195 type Writer = FtpWriter;
196 type Lister = FtpLister;
197 type Deleter = oio::OneShotDeleter<FtpDeleter>;
198
199 fn info(&self) -> Arc<AccessorInfo> {
200 self.core.info()
201 }
202
203 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
204 let mut ftp_stream = self.core.ftp_connect(Operation::CreateDir).await?;
205
206 let paths: Vec<&str> = path.split_inclusive('/').collect();
207
208 let mut curr_path = String::new();
209
210 for path in paths {
211 curr_path.push_str(path);
212 match ftp_stream.mkdir(&curr_path).await {
213 Err(FtpError::UnexpectedResponse(Response {
215 status: Status::FileUnavailable,
216 ..
217 }))
218 | Ok(()) => (),
219 Err(e) => {
220 return Err(format_ftp_error(e));
221 }
222 }
223 }
224
225 Ok(RpCreateDir::default())
226 }
227
228 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
229 let file = self.ftp_stat(path).await?;
230
231 let mode = if file.is_file() {
232 EntryMode::FILE
233 } else if file.is_directory() {
234 EntryMode::DIR
235 } else {
236 EntryMode::Unknown
237 };
238
239 let mut meta = Metadata::new(mode);
240 meta.set_content_length(file.size() as u64);
241 meta.set_last_modified(Timestamp::try_from(file.modified())?);
242
243 Ok(RpStat::new(meta))
244 }
245
246 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
247 let ftp_stream = self.core.ftp_connect(Operation::Read).await?;
248
249 let reader = FtpReader::new(ftp_stream, path.to_string(), args).await?;
250 Ok((RpRead::new(), reader))
251 }
252
253 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
254 let parent = get_parent(path);
256 let paths: Vec<&str> = parent.split('/').collect();
257
258 let mut ftp_stream = self.core.ftp_connect(Operation::Write).await?;
260 let mut curr_path = String::new();
261
262 for path in paths {
263 if path.is_empty() {
264 continue;
265 }
266 curr_path.push_str(path);
267 curr_path.push('/');
268 match ftp_stream.mkdir(&curr_path).await {
269 Err(FtpError::UnexpectedResponse(Response {
271 status: Status::FileUnavailable,
272 ..
273 }))
274 | Ok(()) => (),
275 Err(e) => {
276 return Err(format_ftp_error(e));
277 }
278 }
279 }
280
281 let tmp_path = (!op.append()).then_some(build_tmp_path_of(path));
282 let w = FtpWriter::new(ftp_stream, path.to_string(), tmp_path);
283
284 Ok((RpWrite::new(), w))
285 }
286
287 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
288 Ok((
289 RpDelete::default(),
290 oio::OneShotDeleter::new(FtpDeleter::new(self.core.clone())),
291 ))
292 }
293
294 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
295 let mut ftp_stream = self.core.ftp_connect(Operation::List).await?;
296
297 let pathname = if path == "/" { None } else { Some(path) };
298 let files = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
299
300 Ok((
301 RpList::default(),
302 FtpLister::new(if path == "/" { "" } else { path }, files),
303 ))
304 }
305}
306
307impl FtpBackend {
308 pub async fn ftp_stat(&self, path: &str) -> Result<File> {
309 let mut ftp_stream = self.core.ftp_connect(Operation::Stat).await?;
310
311 let (parent, basename) = (get_parent(path), get_basename(path));
312
313 let pathname = if parent == "/" { None } else { Some(parent) };
314
315 let resp = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
316
317 let mut files = resp
319 .into_iter()
320 .filter_map(|file| File::from_str(file.as_str()).ok())
321 .filter(|f| f.name() == basename.trim_end_matches('/'))
322 .collect::<Vec<File>>();
323
324 if files.is_empty() {
325 Err(Error::new(
326 ErrorKind::NotFound,
327 "file is not found during list",
328 ))
329 } else {
330 Ok(files.remove(0))
331 }
332 }
333}
334
335#[cfg(test)]
336mod build_test {
337 use super::FtpBuilder;
338 use crate::services::FtpConfig;
339 use crate::*;
340
341 #[test]
342 fn test_build() {
343 let b = FtpBuilder::default()
345 .endpoint("ftps://ftp_server.local")
346 .build();
347 assert!(b.is_ok());
348
349 let b = FtpBuilder::default()
351 .endpoint("ftp://ftp_server.local:1234")
352 .build();
353 assert!(b.is_ok());
354
355 let b = FtpBuilder::default()
357 .endpoint("ftp_server.local:8765")
358 .build();
359 assert!(b.is_ok());
360
361 let b = FtpBuilder::default()
363 .endpoint("invalidscheme://ftp_server.local:8765")
364 .build();
365 assert!(b.is_err());
366 let e = b.unwrap_err();
367 assert_eq!(e.kind(), ErrorKind::ConfigInvalid);
368 }
369
370 #[test]
371 fn from_uri_sets_endpoint_and_root() {
372 let uri = OperatorUri::new(
373 "ftp://example.com/public/data",
374 Vec::<(String, String)>::new(),
375 )
376 .unwrap();
377
378 let cfg = FtpConfig::from_uri(&uri).unwrap();
379 assert_eq!(cfg.endpoint.as_deref(), Some("ftp://example.com"));
380 assert_eq!(cfg.root.as_deref(), Some("public/data"));
381 }
382
383 #[test]
384 fn from_uri_applies_credentials_from_query() {
385 let uri = OperatorUri::new(
386 "ftp://example.com/data",
387 vec![
388 ("user".to_string(), "alice".to_string()),
389 ("password".to_string(), "secret".to_string()),
390 ],
391 )
392 .unwrap();
393
394 let cfg = FtpConfig::from_uri(&uri).unwrap();
395 assert_eq!(cfg.endpoint.as_deref(), Some("ftp://example.com"));
396 assert_eq!(cfg.user.as_deref(), Some("alice"));
397 assert_eq!(cfg.password.as_deref(), Some("secret"));
398 }
399}