unftp_sbe_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `unftp-sbe-opendal` is an [unftp](https://crates.io/crates/unftp) `StorageBackend` implementation using opendal.
19//!
20//! This crate can help you to access ANY storage services with the same ftp API.
21//!
22//! # Example
23//!
24//! This example demonstrates how to use `unftp-sbe-opendal` with the `S3` service.
25//!
26//! ```no_run
27//! use anyhow::Result;
28//! use opendal::Operator;
29//! use opendal::Scheme;
30//! use opendal::services;
31//! use unftp_sbe_opendal::OpendalStorage;
32//!
33//! #[tokio::main]
34//! async fn main() -> Result<()> {
35//!     // Create any service desired
36//!     let op = opendal::Operator::from_map::<services::S3>(
37//!         [
38//!             ("bucket".to_string(), "my_bucket".to_string()),
39//!             ("access_key".to_string(), "my_access_key".to_string()),
40//!             ("secret_key".to_string(), "my_secret_key".to_string()),
41//!             ("endpoint".to_string(), "my_endpoint".to_string()),
42//!             ("region".to_string(), "my_region".to_string()),
43//!         ]
44//!             .into_iter()
45//!             .collect(),
46//!     )?.finish();
47//!
48//!     // Wrap the operator with `OpendalStorage`
49//!     let backend = OpendalStorage::new(op);
50//!
51//!     // Build the actual unftp server
52//!     let server = libunftp::ServerBuilder::new(Box::new(move || backend.clone())).build()?;
53//!
54//!     // Start the server
55//!     server.listen("0.0.0.0:0").await?;
56//!
57//!     Ok(())
58//! }
59//! ```
60
61use std::fmt::Debug;
62use std::path::{Path, PathBuf};
63
64use libunftp::auth::UserDetail;
65use libunftp::storage::{self, Error, StorageBackend};
66use opendal::Operator;
67
68use tokio::io::AsyncWriteExt;
69use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
70
71#[derive(Debug, Clone)]
72pub struct OpendalStorage {
73    op: Operator,
74}
75
76impl OpendalStorage {
77    pub fn new(op: Operator) -> Self {
78        Self { op }
79    }
80}
81
82/// A wrapper around [`opendal::Metadata`] to implement [`libunftp::storage::Metadata`].
83#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct OpendalMetadata(opendal::Metadata);
85
86impl storage::Metadata for OpendalMetadata {
87    fn len(&self) -> u64 {
88        self.0.content_length()
89    }
90
91    fn is_dir(&self) -> bool {
92        self.0.is_dir()
93    }
94
95    fn is_file(&self) -> bool {
96        self.0.is_file()
97    }
98
99    fn is_symlink(&self) -> bool {
100        false
101    }
102
103    fn modified(&self) -> storage::Result<std::time::SystemTime> {
104        self.0.last_modified().map(Into::into).ok_or_else(|| {
105            storage::Error::new(storage::ErrorKind::LocalError, "no last modified time")
106        })
107    }
108
109    fn gid(&self) -> u32 {
110        0
111    }
112
113    fn uid(&self) -> u32 {
114        0
115    }
116}
117
118fn convert_err(err: opendal::Error) -> storage::Error {
119    let kind = match err.kind() {
120        opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
121        opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
122        opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
123        _ => storage::ErrorKind::LocalError,
124    };
125    storage::Error::new(kind, err)
126}
127
128fn convert_path(path: &Path) -> storage::Result<&str> {
129    path.to_str().ok_or_else(|| {
130        storage::Error::new(
131            storage::ErrorKind::LocalError,
132            "Path is not a valid UTF-8 string",
133        )
134    })
135}
136
137#[async_trait::async_trait]
138impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
139    type Metadata = OpendalMetadata;
140
141    async fn metadata<P: AsRef<Path> + Send + Debug>(
142        &self,
143        _: &User,
144        path: P,
145    ) -> storage::Result<Self::Metadata> {
146        let metadata = self
147            .op
148            .stat(convert_path(path.as_ref())?)
149            .await
150            .map_err(convert_err)?;
151        Ok(OpendalMetadata(metadata))
152    }
153
154    async fn list<P: AsRef<Path> + Send + Debug>(
155        &self,
156        _: &User,
157        path: P,
158    ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
159    where
160        Self::Metadata: storage::Metadata,
161    {
162        let ret = self
163            .op
164            .list(convert_path(path.as_ref())?)
165            .await
166            .map_err(convert_err)?
167            .into_iter()
168            .map(|x| {
169                let (path, metadata) = x.into_parts();
170                storage::Fileinfo {
171                    path: path.into(),
172                    metadata: OpendalMetadata(metadata),
173                }
174            })
175            .collect();
176        Ok(ret)
177    }
178
179    async fn get<P: AsRef<Path> + Send + Debug>(
180        &self,
181        _: &User,
182        path: P,
183        start_pos: u64,
184    ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
185        let reader = self
186            .op
187            .reader(convert_path(path.as_ref())?)
188            .await
189            .map_err(convert_err)?
190            .into_futures_async_read(start_pos..)
191            .await
192            .map_err(convert_err)?
193            .compat();
194        Ok(Box::new(reader))
195    }
196
197    async fn put<
198        P: AsRef<Path> + Send + Debug,
199        R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
200    >(
201        &self,
202        _: &User,
203        mut input: R,
204        path: P,
205        _: u64,
206    ) -> storage::Result<u64> {
207        let mut w = self
208            .op
209            .writer(convert_path(path.as_ref())?)
210            .await
211            .map_err(convert_err)?
212            .into_futures_async_write()
213            .compat_write();
214        let copy_result = tokio::io::copy(&mut input, &mut w).await;
215        let shutdown_result = w.shutdown().await;
216        match (copy_result, shutdown_result) {
217            (Ok(len), Ok(())) => Ok(len),
218            (Err(copy_err), Ok(())) => Err(Error::new(
219                storage::ErrorKind::LocalError,
220                format!("Failed to copy data: {}", copy_err),
221            )),
222            (Ok(_), Err(shutdown_err)) => Err(Error::new(
223                storage::ErrorKind::LocalError,
224                format!("Failed to shutdown writer: {}", shutdown_err),
225            )),
226            (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
227                storage::ErrorKind::LocalError,
228                format!(
229                    "Failed to copy data: {} AND failed to shutdown writer: {}",
230                    copy_err, shutdown_err
231                ),
232            )),
233        }
234    }
235
236    async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
237        self.op
238            .delete(convert_path(path.as_ref())?)
239            .await
240            .map_err(convert_err)
241    }
242
243    async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
244        let mut path_str = convert_path(path.as_ref())?.to_string();
245        if !path_str.ends_with('/') {
246            path_str.push('/');
247        }
248        self.op.create_dir(&path_str).await.map_err(convert_err)
249    }
250
251    async fn rename<P: AsRef<Path> + Send + Debug>(
252        &self,
253        _: &User,
254        from: P,
255        to: P,
256    ) -> storage::Result<()> {
257        let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
258        self.op.rename(from, to).await.map_err(convert_err)
259    }
260
261    async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
262        self.op
263            .remove_all(convert_path(path.as_ref())?)
264            .await
265            .map_err(convert_err)
266    }
267
268    async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
269        use opendal::ErrorKind::*;
270
271        match self.op.stat(convert_path(path.as_ref())?).await {
272            Ok(_) => Ok(()),
273            Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(storage::Error::new(
274                storage::ErrorKind::PermanentDirectoryNotAvailable,
275                e,
276            )),
277            Err(e) => Err(convert_err(e)),
278        }
279    }
280}