opendal/services/ftp/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use http::Uri;
25use log::debug;
26use services::ftp::core::Manager;
27use suppaftp::list::File;
28use suppaftp::types::Response;
29use suppaftp::FtpError;
30use suppaftp::Status;
31use tokio::sync::OnceCell;
32
33use super::core::FtpCore;
34use super::delete::FtpDeleter;
35use super::err::parse_error;
36use super::lister::FtpLister;
37use super::reader::FtpReader;
38use super::writer::FtpWriter;
39use crate::raw::*;
40use crate::services::FtpConfig;
41use crate::*;
42
43impl Configurator for FtpConfig {
44 type Builder = FtpBuilder;
45 fn into_builder(self) -> Self::Builder {
46 FtpBuilder { config: self }
47 }
48}
49
50#[doc = include_str!("docs.md")]
52#[derive(Default)]
53pub struct FtpBuilder {
54 config: FtpConfig,
55}
56
57impl Debug for FtpBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("FtpBuilder")
60 .field("config", &self.config)
61 .finish()
62 }
63}
64
65impl FtpBuilder {
66 pub fn endpoint(mut self, endpoint: &str) -> Self {
68 self.config.endpoint = if endpoint.is_empty() {
69 None
70 } else {
71 Some(endpoint.to_string())
72 };
73
74 self
75 }
76
77 pub fn root(mut self, root: &str) -> Self {
79 self.config.root = if root.is_empty() {
80 None
81 } else {
82 Some(root.to_string())
83 };
84
85 self
86 }
87
88 pub fn user(mut self, user: &str) -> Self {
90 self.config.user = if user.is_empty() {
91 None
92 } else {
93 Some(user.to_string())
94 };
95
96 self
97 }
98
99 pub fn password(mut self, password: &str) -> Self {
101 self.config.password = if password.is_empty() {
102 None
103 } else {
104 Some(password.to_string())
105 };
106
107 self
108 }
109}
110
111impl Builder for FtpBuilder {
112 const SCHEME: Scheme = Scheme::Ftp;
113 type Config = FtpConfig;
114
115 fn build(self) -> Result<impl Access> {
116 debug!("ftp backend build started: {:?}", &self);
117 let endpoint = match &self.config.endpoint {
118 None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")),
119 Some(v) => v,
120 };
121
122 let endpoint_uri = match endpoint.parse::<Uri>() {
123 Err(e) => {
124 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
125 .with_context("endpoint", endpoint)
126 .set_source(e));
127 }
128 Ok(uri) => uri,
129 };
130
131 let host = endpoint_uri.host().unwrap_or("127.0.0.1");
132 let port = endpoint_uri.port_u16().unwrap_or(21);
133
134 let endpoint = format!("{host}:{port}");
135
136 let enable_secure = match endpoint_uri.scheme_str() {
137 Some("ftp") => false,
138 Some("ftps") | None => true,
141
142 Some(s) => {
143 return Err(Error::new(
144 ErrorKind::ConfigInvalid,
145 "endpoint is unsupported or invalid",
146 )
147 .with_context("endpoint", s));
148 }
149 };
150
151 let root = normalize_root(&self.config.root.unwrap_or_default());
152
153 let user = match &self.config.user {
154 None => "".to_string(),
155 Some(v) => v.clone(),
156 };
157
158 let password = match &self.config.password {
159 None => "".to_string(),
160 Some(v) => v.clone(),
161 };
162
163 let accessor_info = AccessorInfo::default();
164 accessor_info
165 .set_scheme(Scheme::Ftp)
166 .set_root(&root)
167 .set_native_capability(Capability {
168 stat: true,
169
170 read: true,
171
172 write: true,
173 write_can_multi: true,
174 write_can_append: true,
175
176 delete: true,
177 create_dir: true,
178
179 list: true,
180
181 shared: true,
182
183 ..Default::default()
184 });
185 let manager = Manager {
186 endpoint: endpoint.clone(),
187 root: root.clone(),
188 user: user.clone(),
189 password: password.clone(),
190 enable_secure,
191 };
192 let core = Arc::new(FtpCore {
193 info: accessor_info.into(),
194 manager,
195 pool: OnceCell::new(),
196 });
197
198 Ok(FtpBackend { core })
199 }
200}
201
202#[derive(Clone)]
204pub struct FtpBackend {
205 core: Arc<FtpCore>,
206}
207
208impl Debug for FtpBackend {
209 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("Backend").finish()
211 }
212}
213
214impl Access for FtpBackend {
215 type Reader = FtpReader;
216 type Writer = FtpWriter;
217 type Lister = FtpLister;
218 type Deleter = oio::OneShotDeleter<FtpDeleter>;
219
220 fn info(&self) -> Arc<AccessorInfo> {
221 self.core.info.clone()
222 }
223
224 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
225 let mut ftp_stream = self.core.ftp_connect(Operation::CreateDir).await?;
226
227 let paths: Vec<&str> = path.split_inclusive('/').collect();
228
229 let mut curr_path = String::new();
230
231 for path in paths {
232 curr_path.push_str(path);
233 match ftp_stream.mkdir(&curr_path).await {
234 Err(FtpError::UnexpectedResponse(Response {
236 status: Status::FileUnavailable,
237 ..
238 }))
239 | Ok(()) => (),
240 Err(e) => {
241 return Err(parse_error(e));
242 }
243 }
244 }
245
246 Ok(RpCreateDir::default())
247 }
248
249 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
250 let file = self.ftp_stat(path).await?;
251
252 let mode = if file.is_file() {
253 EntryMode::FILE
254 } else if file.is_directory() {
255 EntryMode::DIR
256 } else {
257 EntryMode::Unknown
258 };
259
260 let mut meta = Metadata::new(mode);
261 meta.set_content_length(file.size() as u64);
262 meta.set_last_modified(file.modified().into());
263
264 Ok(RpStat::new(meta))
265 }
266
267 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
268 let ftp_stream = self.core.ftp_connect(Operation::Read).await?;
269
270 let reader = FtpReader::new(ftp_stream, path.to_string(), args).await?;
271 Ok((RpRead::new(), reader))
272 }
273
274 async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
275 let parent = get_parent(path);
277 let paths: Vec<&str> = parent.split('/').collect();
278
279 let mut ftp_stream = self.core.ftp_connect(Operation::Write).await?;
281 let mut curr_path = String::new();
282
283 for path in paths {
284 if path.is_empty() {
285 continue;
286 }
287 curr_path.push_str(path);
288 curr_path.push('/');
289 match ftp_stream.mkdir(&curr_path).await {
290 Err(FtpError::UnexpectedResponse(Response {
292 status: Status::FileUnavailable,
293 ..
294 }))
295 | Ok(()) => (),
296 Err(e) => {
297 return Err(parse_error(e));
298 }
299 }
300 }
301
302 let tmp_path = (!op.append()).then_some(build_tmp_path_of(path));
303 let w = FtpWriter::new(ftp_stream, path.to_string(), tmp_path);
304
305 Ok((RpWrite::new(), w))
306 }
307
308 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
309 Ok((
310 RpDelete::default(),
311 oio::OneShotDeleter::new(FtpDeleter::new(self.core.clone())),
312 ))
313 }
314
315 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
316 let mut ftp_stream = self.core.ftp_connect(Operation::List).await?;
317
318 let pathname = if path == "/" { None } else { Some(path) };
319 let files = ftp_stream.list(pathname).await.map_err(parse_error)?;
320
321 Ok((
322 RpList::default(),
323 FtpLister::new(if path == "/" { "" } else { path }, files),
324 ))
325 }
326}
327
328impl FtpBackend {
329 pub async fn ftp_stat(&self, path: &str) -> Result<File> {
330 let mut ftp_stream = self.core.ftp_connect(Operation::Stat).await?;
331
332 let (parent, basename) = (get_parent(path), get_basename(path));
333
334 let pathname = if parent == "/" { None } else { Some(parent) };
335
336 let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;
337
338 let mut files = resp
340 .into_iter()
341 .filter_map(|file| File::from_str(file.as_str()).ok())
342 .filter(|f| f.name() == basename.trim_end_matches('/'))
343 .collect::<Vec<File>>();
344
345 if files.is_empty() {
346 Err(Error::new(
347 ErrorKind::NotFound,
348 "file is not found during list",
349 ))
350 } else {
351 Ok(files.remove(0))
352 }
353 }
354}
355
356#[cfg(test)]
357mod build_test {
358 use super::FtpBuilder;
359 use crate::*;
360
361 #[test]
362 fn test_build() {
363 let b = FtpBuilder::default()
365 .endpoint("ftps://ftp_server.local")
366 .build();
367 assert!(b.is_ok());
368
369 let b = FtpBuilder::default()
371 .endpoint("ftp://ftp_server.local:1234")
372 .build();
373 assert!(b.is_ok());
374
375 let b = FtpBuilder::default()
377 .endpoint("ftp_server.local:8765")
378 .build();
379 assert!(b.is_ok());
380
381 let b = FtpBuilder::default()
383 .endpoint("invalidscheme://ftp_server.local:8765")
384 .build();
385 assert!(b.is_err());
386 let e = b.unwrap_err();
387 assert_eq!(e.kind(), ErrorKind::ConfigInvalid);
388 }
389}