opendal/services/ftp/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use http::Uri;
25use log::debug;
26use services::ftp::core::Manager;
27use suppaftp::list::File;
28use suppaftp::types::Response;
29use suppaftp::FtpError;
30use suppaftp::Status;
31use tokio::sync::OnceCell;
32
33use super::core::FtpCore;
34use super::delete::FtpDeleter;
35use super::err::parse_error;
36use super::lister::FtpLister;
37use super::reader::FtpReader;
38use super::writer::FtpWriter;
39use super::DEFAULT_SCHEME;
40use crate::raw::*;
41use crate::services::FtpConfig;
42use crate::*;
43impl Configurator for FtpConfig {
44    type Builder = FtpBuilder;
45    fn into_builder(self) -> Self::Builder {
46        FtpBuilder { config: self }
47    }
48}
49
50/// FTP and FTPS services support.
51#[doc = include_str!("docs.md")]
52#[derive(Default)]
53pub struct FtpBuilder {
54    config: FtpConfig,
55}
56
57impl Debug for FtpBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("FtpBuilder")
60            .field("config", &self.config)
61            .finish()
62    }
63}
64
65impl FtpBuilder {
66    /// set endpoint for ftp backend.
67    pub fn endpoint(mut self, endpoint: &str) -> Self {
68        self.config.endpoint = if endpoint.is_empty() {
69            None
70        } else {
71            Some(endpoint.to_string())
72        };
73
74        self
75    }
76
77    /// set root path for ftp backend.
78    pub fn root(mut self, root: &str) -> Self {
79        self.config.root = if root.is_empty() {
80            None
81        } else {
82            Some(root.to_string())
83        };
84
85        self
86    }
87
88    /// set user for ftp backend.
89    pub fn user(mut self, user: &str) -> Self {
90        self.config.user = if user.is_empty() {
91            None
92        } else {
93            Some(user.to_string())
94        };
95
96        self
97    }
98
99    /// set password for ftp backend.
100    pub fn password(mut self, password: &str) -> Self {
101        self.config.password = if password.is_empty() {
102            None
103        } else {
104            Some(password.to_string())
105        };
106
107        self
108    }
109}
110
111impl Builder for FtpBuilder {
112    type Config = FtpConfig;
113
114    fn build(self) -> Result<impl Access> {
115        debug!("ftp backend build started: {:?}", &self);
116        let endpoint = match &self.config.endpoint {
117            None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")),
118            Some(v) => v,
119        };
120
121        let endpoint_uri = match endpoint.parse::<Uri>() {
122            Err(e) => {
123                return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
124                    .with_context("endpoint", endpoint)
125                    .set_source(e));
126            }
127            Ok(uri) => uri,
128        };
129
130        let host = endpoint_uri.host().unwrap_or("127.0.0.1");
131        let port = endpoint_uri.port_u16().unwrap_or(21);
132
133        let endpoint = format!("{host}:{port}");
134
135        let enable_secure = match endpoint_uri.scheme_str() {
136            Some("ftp") => false,
137            // if the user forgot to add a scheme prefix
138            // treat it as using secured scheme
139            Some("ftps") | None => true,
140
141            Some(s) => {
142                return Err(Error::new(
143                    ErrorKind::ConfigInvalid,
144                    "endpoint is unsupported or invalid",
145                )
146                .with_context("endpoint", s));
147            }
148        };
149
150        let root = normalize_root(&self.config.root.unwrap_or_default());
151
152        let user = match &self.config.user {
153            None => "".to_string(),
154            Some(v) => v.clone(),
155        };
156
157        let password = match &self.config.password {
158            None => "".to_string(),
159            Some(v) => v.clone(),
160        };
161
162        let accessor_info = AccessorInfo::default();
163        accessor_info
164            .set_scheme(DEFAULT_SCHEME)
165            .set_root(&root)
166            .set_native_capability(Capability {
167                stat: true,
168
169                read: true,
170
171                write: true,
172                write_can_multi: true,
173                write_can_append: true,
174
175                delete: true,
176                create_dir: true,
177
178                list: true,
179
180                shared: true,
181
182                ..Default::default()
183            });
184        let manager = Manager {
185            endpoint: endpoint.clone(),
186            root: root.clone(),
187            user: user.clone(),
188            password: password.clone(),
189            enable_secure,
190        };
191        let core = Arc::new(FtpCore {
192            info: accessor_info.into(),
193            manager,
194            pool: OnceCell::new(),
195        });
196
197        Ok(FtpBackend { core })
198    }
199}
200
201// Backend is used to serve `Accessor` support for ftp.
202#[derive(Clone)]
203pub struct FtpBackend {
204    core: Arc<FtpCore>,
205}
206
207impl Debug for FtpBackend {
208    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("Backend").finish()
210    }
211}
212
213impl Access for FtpBackend {
214    type Reader = FtpReader;
215    type Writer = FtpWriter;
216    type Lister = FtpLister;
217    type Deleter = oio::OneShotDeleter<FtpDeleter>;
218
219    fn info(&self) -> Arc<AccessorInfo> {
220        self.core.info.clone()
221    }
222
223    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
224        let mut ftp_stream = self.core.ftp_connect(Operation::CreateDir).await?;
225
226        let paths: Vec<&str> = path.split_inclusive('/').collect();
227
228        let mut curr_path = String::new();
229
230        for path in paths {
231            curr_path.push_str(path);
232            match ftp_stream.mkdir(&curr_path).await {
233                // Do nothing if status is FileUnavailable or OK(()) is return.
234                Err(FtpError::UnexpectedResponse(Response {
235                    status: Status::FileUnavailable,
236                    ..
237                }))
238                | Ok(()) => (),
239                Err(e) => {
240                    return Err(parse_error(e));
241                }
242            }
243        }
244
245        Ok(RpCreateDir::default())
246    }
247
248    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
249        let file = self.ftp_stat(path).await?;
250
251        let mode = if file.is_file() {
252            EntryMode::FILE
253        } else if file.is_directory() {
254            EntryMode::DIR
255        } else {
256            EntryMode::Unknown
257        };
258
259        let mut meta = Metadata::new(mode);
260        meta.set_content_length(file.size() as u64);
261        meta.set_last_modified(file.modified().into());
262
263        Ok(RpStat::new(meta))
264    }
265
266    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
267        let ftp_stream = self.core.ftp_connect(Operation::Read).await?;
268
269        let reader = FtpReader::new(ftp_stream, path.to_string(), args).await?;
270        Ok((RpRead::new(), reader))
271    }
272
273    async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
274        // Ensure the parent dir exists.
275        let parent = get_parent(path);
276        let paths: Vec<&str> = parent.split('/').collect();
277
278        // TODO: we can optimize this by checking dir existence first.
279        let mut ftp_stream = self.core.ftp_connect(Operation::Write).await?;
280        let mut curr_path = String::new();
281
282        for path in paths {
283            if path.is_empty() {
284                continue;
285            }
286            curr_path.push_str(path);
287            curr_path.push('/');
288            match ftp_stream.mkdir(&curr_path).await {
289                // Do nothing if status is FileUnavailable or OK(()) is return.
290                Err(FtpError::UnexpectedResponse(Response {
291                    status: Status::FileUnavailable,
292                    ..
293                }))
294                | Ok(()) => (),
295                Err(e) => {
296                    return Err(parse_error(e));
297                }
298            }
299        }
300
301        let tmp_path = (!op.append()).then_some(build_tmp_path_of(path));
302        let w = FtpWriter::new(ftp_stream, path.to_string(), tmp_path);
303
304        Ok((RpWrite::new(), w))
305    }
306
307    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
308        Ok((
309            RpDelete::default(),
310            oio::OneShotDeleter::new(FtpDeleter::new(self.core.clone())),
311        ))
312    }
313
314    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
315        let mut ftp_stream = self.core.ftp_connect(Operation::List).await?;
316
317        let pathname = if path == "/" { None } else { Some(path) };
318        let files = ftp_stream.list(pathname).await.map_err(parse_error)?;
319
320        Ok((
321            RpList::default(),
322            FtpLister::new(if path == "/" { "" } else { path }, files),
323        ))
324    }
325}
326
327impl FtpBackend {
328    pub async fn ftp_stat(&self, path: &str) -> Result<File> {
329        let mut ftp_stream = self.core.ftp_connect(Operation::Stat).await?;
330
331        let (parent, basename) = (get_parent(path), get_basename(path));
332
333        let pathname = if parent == "/" { None } else { Some(parent) };
334
335        let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;
336
337        // Get stat of file.
338        let mut files = resp
339            .into_iter()
340            .filter_map(|file| File::from_str(file.as_str()).ok())
341            .filter(|f| f.name() == basename.trim_end_matches('/'))
342            .collect::<Vec<File>>();
343
344        if files.is_empty() {
345            Err(Error::new(
346                ErrorKind::NotFound,
347                "file is not found during list",
348            ))
349        } else {
350            Ok(files.remove(0))
351        }
352    }
353}
354
355#[cfg(test)]
356mod build_test {
357    use super::FtpBuilder;
358    use crate::*;
359
360    #[test]
361    fn test_build() {
362        // ftps scheme, should suffix with default port 21
363        let b = FtpBuilder::default()
364            .endpoint("ftps://ftp_server.local")
365            .build();
366        assert!(b.is_ok());
367
368        // ftp scheme
369        let b = FtpBuilder::default()
370            .endpoint("ftp://ftp_server.local:1234")
371            .build();
372        assert!(b.is_ok());
373
374        // no scheme
375        let b = FtpBuilder::default()
376            .endpoint("ftp_server.local:8765")
377            .build();
378        assert!(b.is_ok());
379
380        // invalid scheme
381        let b = FtpBuilder::default()
382            .endpoint("invalidscheme://ftp_server.local:8765")
383            .build();
384        assert!(b.is_err());
385        let e = b.unwrap_err();
386        assert_eq!(e.kind(), ErrorKind::ConfigInvalid);
387    }
388}