unftp_sbe_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `unftp-sbe-opendal` is an [unftp](https://crates.io/crates/unftp) `StorageBackend` implementation using opendal.
19//!
20//! This crate can help you to access ANY storage services with the same ftp API.
21//!
22//! # Example
23//!
24//! This example demonstrates how to use `unftp-sbe-opendal` with the `S3` service.
25//!
26//! ```no_run
27//! use anyhow::Result;
28//! use opendal::Operator;
29//! use opendal::Scheme;
30//! use opendal::services;
31//! use unftp_sbe_opendal::OpendalStorage;
32//!
33//! #[tokio::main]
34//! async fn main() -> Result<()> {
35//!     // Create any service desired
36//!     let op = opendal::Operator::from_map::<services::S3>(
37//!         [
38//!             ("bucket".to_string(), "my_bucket".to_string()),
39//!             ("access_key".to_string(), "my_access_key".to_string()),
40//!             ("secret_key".to_string(), "my_secret_key".to_string()),
41//!             ("endpoint".to_string(), "my_endpoint".to_string()),
42//!             ("region".to_string(), "my_region".to_string()),
43//!         ]
44//!             .into_iter()
45//!             .collect(),
46//!     )?.finish();
47//!
48//!     // Wrap the operator with `OpendalStorage`
49//!     let backend = OpendalStorage::new(op);
50//!
51//!     // Build the actual unftp server
52//!     let server = libunftp::ServerBuilder::new(Box::new(move || backend.clone())).build()?;
53//!
54//!     // Start the server
55//!     server.listen("0.0.0.0:0").await?;
56//!
57//!     Ok(())
58//! }
59//! ```
60
61use std::fmt::Debug;
62use std::path::{Path, PathBuf};
63
64use libunftp::auth::UserDetail;
65use libunftp::storage::{self, StorageBackend};
66use opendal::Operator;
67
68use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
69
70#[derive(Debug, Clone)]
71pub struct OpendalStorage {
72    op: Operator,
73}
74
75impl OpendalStorage {
76    pub fn new(op: Operator) -> Self {
77        Self { op }
78    }
79}
80
81/// A wrapper around [`opendal::Metadata`] to implement [`libunftp::storage::Metadata`].
82#[derive(Debug, Clone, Eq, PartialEq)]
83pub struct OpendalMetadata(opendal::Metadata);
84
85impl storage::Metadata for OpendalMetadata {
86    fn len(&self) -> u64 {
87        self.0.content_length()
88    }
89
90    fn is_dir(&self) -> bool {
91        self.0.is_dir()
92    }
93
94    fn is_file(&self) -> bool {
95        self.0.is_file()
96    }
97
98    fn is_symlink(&self) -> bool {
99        false
100    }
101
102    fn modified(&self) -> storage::Result<std::time::SystemTime> {
103        self.0.last_modified().map(Into::into).ok_or_else(|| {
104            storage::Error::new(storage::ErrorKind::LocalError, "no last modified time")
105        })
106    }
107
108    fn gid(&self) -> u32 {
109        0
110    }
111
112    fn uid(&self) -> u32 {
113        0
114    }
115}
116
117fn convert_err(err: opendal::Error) -> storage::Error {
118    let kind = match err.kind() {
119        opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
120        opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
121        opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
122        _ => storage::ErrorKind::LocalError,
123    };
124    storage::Error::new(kind, err)
125}
126
127fn convert_path(path: &Path) -> storage::Result<&str> {
128    path.to_str().ok_or_else(|| {
129        storage::Error::new(
130            storage::ErrorKind::LocalError,
131            "Path is not a valid UTF-8 string",
132        )
133    })
134}
135
136#[async_trait::async_trait]
137impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
138    type Metadata = OpendalMetadata;
139
140    async fn metadata<P: AsRef<Path> + Send + Debug>(
141        &self,
142        _: &User,
143        path: P,
144    ) -> storage::Result<Self::Metadata> {
145        let metadata = self
146            .op
147            .stat(convert_path(path.as_ref())?)
148            .await
149            .map_err(convert_err)?;
150        Ok(OpendalMetadata(metadata))
151    }
152
153    async fn list<P: AsRef<Path> + Send + Debug>(
154        &self,
155        _: &User,
156        path: P,
157    ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
158    where
159        Self::Metadata: storage::Metadata,
160    {
161        let ret = self
162            .op
163            .list(convert_path(path.as_ref())?)
164            .await
165            .map_err(convert_err)?
166            .into_iter()
167            .map(|x| {
168                let (path, metadata) = x.into_parts();
169                storage::Fileinfo {
170                    path: path.into(),
171                    metadata: OpendalMetadata(metadata),
172                }
173            })
174            .collect();
175        Ok(ret)
176    }
177
178    async fn get<P: AsRef<Path> + Send + Debug>(
179        &self,
180        _: &User,
181        path: P,
182        start_pos: u64,
183    ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
184        let reader = self
185            .op
186            .reader(convert_path(path.as_ref())?)
187            .await
188            .map_err(convert_err)?
189            .into_futures_async_read(start_pos..)
190            .await
191            .map_err(convert_err)?
192            .compat();
193        Ok(Box::new(reader))
194    }
195
196    async fn put<
197        P: AsRef<Path> + Send + Debug,
198        R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
199    >(
200        &self,
201        _: &User,
202        mut input: R,
203        path: P,
204        _: u64,
205    ) -> storage::Result<u64> {
206        let mut w = self
207            .op
208            .writer(convert_path(path.as_ref())?)
209            .await
210            .map_err(convert_err)?
211            .into_futures_async_write()
212            .compat_write();
213        let len = tokio::io::copy(&mut input, &mut w).await?;
214        Ok(len)
215    }
216
217    async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
218        self.op
219            .delete(convert_path(path.as_ref())?)
220            .await
221            .map_err(convert_err)
222    }
223
224    async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
225        let mut path_str = convert_path(path.as_ref())?.to_string();
226        if !path_str.ends_with('/') {
227            path_str.push('/');
228        }
229        self.op.create_dir(&path_str).await.map_err(convert_err)
230    }
231
232    async fn rename<P: AsRef<Path> + Send + Debug>(
233        &self,
234        _: &User,
235        from: P,
236        to: P,
237    ) -> storage::Result<()> {
238        let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
239        self.op.rename(from, to).await.map_err(convert_err)
240    }
241
242    async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
243        self.op
244            .remove_all(convert_path(path.as_ref())?)
245            .await
246            .map_err(convert_err)
247    }
248
249    async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
250        use opendal::ErrorKind::*;
251
252        match self.op.stat(convert_path(path.as_ref())?).await {
253            Ok(_) => Ok(()),
254            Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(storage::Error::new(
255                storage::ErrorKind::PermanentDirectoryNotAvailable,
256                e,
257            )),
258            Err(e) => Err(convert_err(e)),
259        }
260    }
261}