unftp_sbe_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `unftp-sbe-opendal` is an [unftp](https://crates.io/crates/unftp) `StorageBackend` implementation using opendal.
19//!
20//! This crate can help you to access ANY storage services with the same ftp API.
21//!
22//! # Example
23//!
24//! This example demonstrates how to use `unftp-sbe-opendal` with the `S3` service.
25//!
26//! ```no_run
27//! use anyhow::Result;
28//! use opendal::Operator;
29//! use opendal::services;
30//! use unftp_sbe_opendal::OpendalStorage;
31//!
32//! #[tokio::main]
33//! async fn main() -> Result<()> {
34//!     // Create any service desired
35//!     let op = opendal::Operator::from_iter::<services::S3>(
36//!         [
37//!             ("bucket".to_string(), "my_bucket".to_string()),
38//!             ("access_key".to_string(), "my_access_key".to_string()),
39//!             ("secret_key".to_string(), "my_secret_key".to_string()),
40//!             ("endpoint".to_string(), "my_endpoint".to_string()),
41//!             ("region".to_string(), "my_region".to_string()),
42//!         ]
43//!             .into_iter()
44//!             .collect(),
45//!     )?.finish();
46//!
47//!     // Wrap the operator with `OpendalStorage`
48//!     let backend = OpendalStorage::new(op);
49//!
50//!     // Build the actual unftp server
51//!     let server = libunftp::ServerBuilder::new(Box::new(move || backend.clone())).build()?;
52//!
53//!     // Start the server
54//!     server.listen("0.0.0.0:0").await?;
55//!
56//!     Ok(())
57//! }
58//! ```
59
60use std::fmt::Debug;
61use std::path::{Path, PathBuf};
62
63use libunftp::auth::UserDetail;
64use libunftp::storage::{self, Error, StorageBackend};
65use opendal::Operator;
66
67use tokio::io::AsyncWriteExt;
68use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
69
70#[derive(Debug, Clone)]
71pub struct OpendalStorage {
72    op: Operator,
73}
74
75impl OpendalStorage {
76    pub fn new(op: Operator) -> Self {
77        Self { op }
78    }
79}
80
81/// A wrapper around [`opendal::Metadata`] to implement [`storage::Metadata`].
82#[derive(Debug, Clone, Eq, PartialEq)]
83pub struct OpendalMetadata(opendal::Metadata);
84
85impl storage::Metadata for OpendalMetadata {
86    fn len(&self) -> u64 {
87        self.0.content_length()
88    }
89
90    fn is_dir(&self) -> bool {
91        self.0.is_dir()
92    }
93
94    fn is_file(&self) -> bool {
95        self.0.is_file()
96    }
97
98    fn is_symlink(&self) -> bool {
99        false
100    }
101
102    fn modified(&self) -> storage::Result<std::time::SystemTime> {
103        match self.0.last_modified() {
104            Some(ts) => Ok(ts.into()),
105            None => Err(Error::new(
106                storage::ErrorKind::LocalError,
107                "no last modified time",
108            )),
109        }
110    }
111
112    fn gid(&self) -> u32 {
113        0
114    }
115
116    fn uid(&self) -> u32 {
117        0
118    }
119}
120
121fn convert_err(err: opendal::Error) -> Error {
122    let kind = match err.kind() {
123        opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
124        opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
125        opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
126        _ => storage::ErrorKind::LocalError,
127    };
128    Error::new(kind, err)
129}
130
131fn convert_path(path: &Path) -> storage::Result<&str> {
132    path.to_str().ok_or_else(|| {
133        Error::new(
134            storage::ErrorKind::LocalError,
135            "Path is not a valid UTF-8 string",
136        )
137    })
138}
139
140#[async_trait::async_trait]
141impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
142    type Metadata = OpendalMetadata;
143
144    async fn metadata<P: AsRef<Path> + Send + Debug>(
145        &self,
146        _: &User,
147        path: P,
148    ) -> storage::Result<Self::Metadata> {
149        let metadata = self
150            .op
151            .stat(convert_path(path.as_ref())?)
152            .await
153            .map_err(convert_err)?;
154        Ok(OpendalMetadata(metadata))
155    }
156
157    async fn list<P: AsRef<Path> + Send + Debug>(
158        &self,
159        _: &User,
160        path: P,
161    ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
162    where
163        Self::Metadata: storage::Metadata,
164    {
165        let ret = self
166            .op
167            .list(convert_path(path.as_ref())?)
168            .await
169            .map_err(convert_err)?
170            .into_iter()
171            .map(|x| {
172                let (path, metadata) = x.into_parts();
173                storage::Fileinfo {
174                    path: path.into(),
175                    metadata: OpendalMetadata(metadata),
176                }
177            })
178            .collect();
179        Ok(ret)
180    }
181
182    async fn get<P: AsRef<Path> + Send + Debug>(
183        &self,
184        _: &User,
185        path: P,
186        start_pos: u64,
187    ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
188        let reader = self
189            .op
190            .reader(convert_path(path.as_ref())?)
191            .await
192            .map_err(convert_err)?
193            .into_futures_async_read(start_pos..)
194            .await
195            .map_err(convert_err)?
196            .compat();
197        Ok(Box::new(reader))
198    }
199
200    async fn put<
201        P: AsRef<Path> + Send + Debug,
202        R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
203    >(
204        &self,
205        _: &User,
206        mut input: R,
207        path: P,
208        _: u64,
209    ) -> storage::Result<u64> {
210        let mut w = self
211            .op
212            .writer(convert_path(path.as_ref())?)
213            .await
214            .map_err(convert_err)?
215            .into_futures_async_write()
216            .compat_write();
217        let copy_result = tokio::io::copy(&mut input, &mut w).await;
218        let shutdown_result = w.shutdown().await;
219        match (copy_result, shutdown_result) {
220            (Ok(len), Ok(())) => Ok(len),
221            (Err(copy_err), Ok(())) => Err(Error::new(
222                storage::ErrorKind::LocalError,
223                format!("Failed to copy data: {}", copy_err),
224            )),
225            (Ok(_), Err(shutdown_err)) => Err(Error::new(
226                storage::ErrorKind::LocalError,
227                format!("Failed to shutdown writer: {}", shutdown_err),
228            )),
229            (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
230                storage::ErrorKind::LocalError,
231                format!(
232                    "Failed to copy data: {} AND failed to shutdown writer: {}",
233                    copy_err, shutdown_err
234                ),
235            )),
236        }
237    }
238
239    async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
240        self.op
241            .delete(convert_path(path.as_ref())?)
242            .await
243            .map_err(convert_err)
244    }
245
246    async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
247        let mut path_str = convert_path(path.as_ref())?.to_string();
248        if !path_str.ends_with('/') {
249            path_str.push('/');
250        }
251        self.op.create_dir(&path_str).await.map_err(convert_err)
252    }
253
254    async fn rename<P: AsRef<Path> + Send + Debug>(
255        &self,
256        _: &User,
257        from: P,
258        to: P,
259    ) -> storage::Result<()> {
260        let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
261        self.op.rename(from, to).await.map_err(convert_err)
262    }
263
264    async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
265        self.op
266            .remove_all(convert_path(path.as_ref())?)
267            .await
268            .map_err(convert_err)
269    }
270
271    async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
272        use opendal::ErrorKind::*;
273
274        match self.op.stat(convert_path(path.as_ref())?).await {
275            Ok(_) => Ok(()),
276            Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(Error::new(
277                storage::ErrorKind::PermanentDirectoryNotAvailable,
278                e,
279            )),
280            Err(e) => Err(convert_err(e)),
281        }
282    }
283}