unftp_sbe_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `unftp-sbe-opendal` is an [unftp](https://crates.io/crates/unftp) `StorageBackend` implementation using opendal.
19//!
20//! This crate can help you to access ANY storage services with the same ftp API.
21//!
22//! # Example
23//!
24//! This example demonstrates how to use `unftp-sbe-opendal` with the `S3` service.
25//!
26//! ```no_run
27//! use anyhow::Result;
28//! use opendal::Operator;
29//! use opendal::services;
30//! use unftp_sbe_opendal::OpendalStorage;
31//!
32//! #[tokio::main]
33//! async fn main() -> Result<()> {
34//!     // Create any service desired
35//!     let op = opendal::Operator::from_iter::<services::S3>(
36//!         [
37//!             ("bucket".to_string(), "my_bucket".to_string()),
38//!             ("access_key".to_string(), "my_access_key".to_string()),
39//!             ("secret_key".to_string(), "my_secret_key".to_string()),
40//!             ("endpoint".to_string(), "my_endpoint".to_string()),
41//!             ("region".to_string(), "my_region".to_string()),
42//!         ]
43//!             .into_iter()
44//!             .collect(),
45//!     )?.finish();
46//!
47//!     // Wrap the operator with `OpendalStorage`
48//!     let backend = OpendalStorage::new(op);
49//!
50//!     // Build the actual unftp server
51//!     let server = libunftp::ServerBuilder::new(Box::new(move || backend.clone())).build()?;
52//!
53//!     // Start the server
54//!     server.listen("0.0.0.0:0").await?;
55//!
56//!     Ok(())
57//! }
58//! ```
59
60use std::fmt::Debug;
61use std::path::{Path, PathBuf};
62
63use opendal::Operator;
64use unftp_core::auth::UserDetail;
65use unftp_core::storage::{self, Error, StorageBackend};
66
67use tokio::io::AsyncReadExt;
68use tokio::io::AsyncWriteExt;
69use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
70
71#[derive(Debug, Clone)]
72pub struct OpendalStorage {
73    op: Operator,
74}
75
76impl OpendalStorage {
77    pub fn new(op: Operator) -> Self {
78        Self { op }
79    }
80}
81
82/// A wrapper around [`opendal::Metadata`] to implement [`storage::Metadata`].
83#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct OpendalMetadata(opendal::Metadata);
85
86impl storage::Metadata for OpendalMetadata {
87    fn len(&self) -> u64 {
88        self.0.content_length()
89    }
90
91    fn is_dir(&self) -> bool {
92        self.0.is_dir()
93    }
94
95    fn is_file(&self) -> bool {
96        self.0.is_file()
97    }
98
99    fn is_symlink(&self) -> bool {
100        false
101    }
102
103    fn modified(&self) -> storage::Result<std::time::SystemTime> {
104        match self.0.last_modified() {
105            Some(ts) => Ok(ts.into()),
106            None => Err(Error::new(
107                storage::ErrorKind::LocalError,
108                "no last modified time",
109            )),
110        }
111    }
112
113    fn gid(&self) -> u32 {
114        0
115    }
116
117    fn uid(&self) -> u32 {
118        0
119    }
120}
121
122fn convert_err(err: opendal::Error) -> Error {
123    let kind = match err.kind() {
124        opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
125        opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
126        opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
127        _ => storage::ErrorKind::LocalError,
128    };
129    Error::new(kind, err)
130}
131
132fn convert_path(path: &Path) -> storage::Result<&str> {
133    path.to_str().ok_or_else(|| {
134        Error::new(
135            storage::ErrorKind::LocalError,
136            "Path is not a valid UTF-8 string",
137        )
138    })
139}
140
141async fn copy_read_write_loop<R, W>(input: &mut R, output: &mut W) -> std::io::Result<u64>
142where
143    R: tokio::io::AsyncRead + Unpin + ?Sized,
144    W: tokio::io::AsyncWrite + Unpin + ?Sized,
145{
146    let mut copied = 0u64;
147    let mut buf = [0u8; 8 * 1024];
148
149    loop {
150        let n = input.read(&mut buf).await?;
151        if n == 0 {
152            return Ok(copied);
153        }
154
155        output.write_all(&buf[..n]).await?;
156        copied += n as u64;
157    }
158}
159
160#[async_trait::async_trait]
161impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
162    type Metadata = OpendalMetadata;
163
164    async fn metadata<P: AsRef<Path> + Send + Debug>(
165        &self,
166        _: &User,
167        path: P,
168    ) -> storage::Result<Self::Metadata> {
169        let metadata = self
170            .op
171            .stat(convert_path(path.as_ref())?)
172            .await
173            .map_err(convert_err)?;
174        Ok(OpendalMetadata(metadata))
175    }
176
177    async fn list<P: AsRef<Path> + Send + Debug>(
178        &self,
179        _: &User,
180        path: P,
181    ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
182    where
183        Self::Metadata: storage::Metadata,
184    {
185        let ret = self
186            .op
187            .list(convert_path(path.as_ref())?)
188            .await
189            .map_err(convert_err)?
190            .into_iter()
191            .map(|x| {
192                let (path, metadata) = x.into_parts();
193                storage::Fileinfo {
194                    path: path.into(),
195                    metadata: OpendalMetadata(metadata),
196                }
197            })
198            .collect();
199        Ok(ret)
200    }
201
202    async fn get<P: AsRef<Path> + Send + Debug>(
203        &self,
204        _: &User,
205        path: P,
206        start_pos: u64,
207    ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
208        let reader = self
209            .op
210            .reader(convert_path(path.as_ref())?)
211            .await
212            .map_err(convert_err)?
213            .into_futures_async_read(start_pos..)
214            .await
215            .map_err(convert_err)?
216            .compat();
217        Ok(Box::new(reader))
218    }
219
220    async fn put<
221        P: AsRef<Path> + Send + Debug,
222        R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
223    >(
224        &self,
225        _: &User,
226        mut input: R,
227        path: P,
228        _: u64,
229    ) -> storage::Result<u64> {
230        let mut w = self
231            .op
232            .writer(convert_path(path.as_ref())?)
233            .await
234            .map_err(convert_err)?
235            .into_futures_async_write()
236            .compat_write();
237        // Avoid `tokio::io::copy`'s pending-read flush path and keep buffering policy explicit.
238        let copy_result = copy_read_write_loop(&mut input, &mut w).await;
239        let shutdown_result = w.shutdown().await;
240        match (copy_result, shutdown_result) {
241            (Ok(len), Ok(())) => Ok(len),
242            (Err(copy_err), Ok(())) => Err(Error::new(
243                storage::ErrorKind::LocalError,
244                format!("Failed to copy data: {}", copy_err),
245            )),
246            (Ok(_), Err(shutdown_err)) => Err(Error::new(
247                storage::ErrorKind::LocalError,
248                format!("Failed to shutdown writer: {}", shutdown_err),
249            )),
250            (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
251                storage::ErrorKind::LocalError,
252                format!(
253                    "Failed to copy data: {} AND failed to shutdown writer: {}",
254                    copy_err, shutdown_err
255                ),
256            )),
257        }
258    }
259
260    async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
261        self.op
262            .delete(convert_path(path.as_ref())?)
263            .await
264            .map_err(convert_err)
265    }
266
267    async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
268        let mut path_str = convert_path(path.as_ref())?.to_string();
269        if !path_str.ends_with('/') {
270            path_str.push('/');
271        }
272        self.op.create_dir(&path_str).await.map_err(convert_err)
273    }
274
275    async fn rename<P: AsRef<Path> + Send + Debug>(
276        &self,
277        _: &User,
278        from: P,
279        to: P,
280    ) -> storage::Result<()> {
281        let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
282        self.op.rename(from, to).await.map_err(convert_err)
283    }
284
285    async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
286        self.op
287            .delete_with(convert_path(path.as_ref())?)
288            .recursive(true)
289            .await
290            .map_err(convert_err)
291    }
292
293    async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
294        use opendal::ErrorKind::*;
295
296        match self.op.stat(convert_path(path.as_ref())?).await {
297            Ok(_) => Ok(()),
298            Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(Error::new(
299                storage::ErrorKind::PermanentDirectoryNotAvailable,
300                e,
301            )),
302            Err(e) => Err(convert_err(e)),
303        }
304    }
305}