unftp_sbe_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `unftp-sbe-opendal` is an [unftp](https://crates.io/crates/unftp) `StorageBackend` implementation using opendal.
19//!
20//! This crate can help you to access ANY storage services with the same ftp API.
21//!
22//! # Example
23//!
24//! This example demonstrates how to use `unftp-sbe-opendal` with the `S3` service.
25//!
26//! ```no_run
27//! use anyhow::Result;
28//! use opendal::Operator;
29//! use opendal::Scheme;
30//! use opendal::services;
31//! use unftp_sbe_opendal::OpendalStorage;
32//!
33//! #[tokio::main]
34//! async fn main() -> Result<()> {
35//!     // Create any service desired
36//!     let op = opendal::Operator::from_map::<services::S3>(
37//!         [
38//!             ("bucket".to_string(), "my_bucket".to_string()),
39//!             ("access_key".to_string(), "my_access_key".to_string()),
40//!             ("secret_key".to_string(), "my_secret_key".to_string()),
41//!             ("endpoint".to_string(), "my_endpoint".to_string()),
42//!             ("region".to_string(), "my_region".to_string()),
43//!         ]
44//!             .into_iter()
45//!             .collect(),
46//!     )?.finish();
47//!
48//!     // Wrap the operator with `OpendalStorage`
49//!     let backend = OpendalStorage::new(op);
50//!
51//!     // Build the actual unftp server
52//!     let server = libunftp::ServerBuilder::new(Box::new(move || backend.clone())).build()?;
53//!
54//!     // Start the server
55//!     server.listen("0.0.0.0:0").await?;
56//!
57//!     Ok(())
58//! }
59//! ```
60
61use std::fmt::Debug;
62use std::path::{Path, PathBuf};
63
64use libunftp::auth::UserDetail;
65use libunftp::storage::{self, Error, StorageBackend};
66use opendal::Operator;
67
68use tokio::io::AsyncWriteExt;
69use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
70
71#[derive(Debug, Clone)]
72pub struct OpendalStorage {
73    op: Operator,
74}
75
76impl OpendalStorage {
77    pub fn new(op: Operator) -> Self {
78        Self { op }
79    }
80}
81
82/// A wrapper around [`opendal::Metadata`] to implement [`storage::Metadata`].
83#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct OpendalMetadata(opendal::Metadata);
85
86impl storage::Metadata for OpendalMetadata {
87    fn len(&self) -> u64 {
88        self.0.content_length()
89    }
90
91    fn is_dir(&self) -> bool {
92        self.0.is_dir()
93    }
94
95    fn is_file(&self) -> bool {
96        self.0.is_file()
97    }
98
99    fn is_symlink(&self) -> bool {
100        false
101    }
102
103    fn modified(&self) -> storage::Result<std::time::SystemTime> {
104        match self.0.last_modified() {
105            Some(ts) => Ok(ts.into()),
106            None => Err(Error::new(
107                storage::ErrorKind::LocalError,
108                "no last modified time",
109            )),
110        }
111    }
112
113    fn gid(&self) -> u32 {
114        0
115    }
116
117    fn uid(&self) -> u32 {
118        0
119    }
120}
121
122fn convert_err(err: opendal::Error) -> Error {
123    let kind = match err.kind() {
124        opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
125        opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
126        opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
127        _ => storage::ErrorKind::LocalError,
128    };
129    Error::new(kind, err)
130}
131
132fn convert_path(path: &Path) -> storage::Result<&str> {
133    path.to_str().ok_or_else(|| {
134        Error::new(
135            storage::ErrorKind::LocalError,
136            "Path is not a valid UTF-8 string",
137        )
138    })
139}
140
141#[async_trait::async_trait]
142impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
143    type Metadata = OpendalMetadata;
144
145    async fn metadata<P: AsRef<Path> + Send + Debug>(
146        &self,
147        _: &User,
148        path: P,
149    ) -> storage::Result<Self::Metadata> {
150        let metadata = self
151            .op
152            .stat(convert_path(path.as_ref())?)
153            .await
154            .map_err(convert_err)?;
155        Ok(OpendalMetadata(metadata))
156    }
157
158    async fn list<P: AsRef<Path> + Send + Debug>(
159        &self,
160        _: &User,
161        path: P,
162    ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
163    where
164        Self::Metadata: storage::Metadata,
165    {
166        let ret = self
167            .op
168            .list(convert_path(path.as_ref())?)
169            .await
170            .map_err(convert_err)?
171            .into_iter()
172            .map(|x| {
173                let (path, metadata) = x.into_parts();
174                storage::Fileinfo {
175                    path: path.into(),
176                    metadata: OpendalMetadata(metadata),
177                }
178            })
179            .collect();
180        Ok(ret)
181    }
182
183    async fn get<P: AsRef<Path> + Send + Debug>(
184        &self,
185        _: &User,
186        path: P,
187        start_pos: u64,
188    ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
189        let reader = self
190            .op
191            .reader(convert_path(path.as_ref())?)
192            .await
193            .map_err(convert_err)?
194            .into_futures_async_read(start_pos..)
195            .await
196            .map_err(convert_err)?
197            .compat();
198        Ok(Box::new(reader))
199    }
200
201    async fn put<
202        P: AsRef<Path> + Send + Debug,
203        R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
204    >(
205        &self,
206        _: &User,
207        mut input: R,
208        path: P,
209        _: u64,
210    ) -> storage::Result<u64> {
211        let mut w = self
212            .op
213            .writer(convert_path(path.as_ref())?)
214            .await
215            .map_err(convert_err)?
216            .into_futures_async_write()
217            .compat_write();
218        let copy_result = tokio::io::copy(&mut input, &mut w).await;
219        let shutdown_result = w.shutdown().await;
220        match (copy_result, shutdown_result) {
221            (Ok(len), Ok(())) => Ok(len),
222            (Err(copy_err), Ok(())) => Err(Error::new(
223                storage::ErrorKind::LocalError,
224                format!("Failed to copy data: {}", copy_err),
225            )),
226            (Ok(_), Err(shutdown_err)) => Err(Error::new(
227                storage::ErrorKind::LocalError,
228                format!("Failed to shutdown writer: {}", shutdown_err),
229            )),
230            (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
231                storage::ErrorKind::LocalError,
232                format!(
233                    "Failed to copy data: {} AND failed to shutdown writer: {}",
234                    copy_err, shutdown_err
235                ),
236            )),
237        }
238    }
239
240    async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
241        self.op
242            .delete(convert_path(path.as_ref())?)
243            .await
244            .map_err(convert_err)
245    }
246
247    async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
248        let mut path_str = convert_path(path.as_ref())?.to_string();
249        if !path_str.ends_with('/') {
250            path_str.push('/');
251        }
252        self.op.create_dir(&path_str).await.map_err(convert_err)
253    }
254
255    async fn rename<P: AsRef<Path> + Send + Debug>(
256        &self,
257        _: &User,
258        from: P,
259        to: P,
260    ) -> storage::Result<()> {
261        let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
262        self.op.rename(from, to).await.map_err(convert_err)
263    }
264
265    async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
266        self.op
267            .remove_all(convert_path(path.as_ref())?)
268            .await
269            .map_err(convert_err)
270    }
271
272    async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
273        use opendal::ErrorKind::*;
274
275        match self.op.stat(convert_path(path.as_ref())?).await {
276            Ok(_) => Ok(()),
277            Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(Error::new(
278                storage::ErrorKind::PermanentDirectoryNotAvailable,
279                e,
280            )),
281            Err(e) => Err(convert_err(e)),
282        }
283    }
284}