1use std::fmt::Debug;
62use std::path::{Path, PathBuf};
63
64use libunftp::auth::UserDetail;
65use libunftp::storage::{self, Error, StorageBackend};
66use opendal::Operator;
67
68use tokio::io::AsyncWriteExt;
69use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
70
71#[derive(Debug, Clone)]
72pub struct OpendalStorage {
73 op: Operator,
74}
75
76impl OpendalStorage {
77 pub fn new(op: Operator) -> Self {
78 Self { op }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct OpendalMetadata(opendal::Metadata);
85
86impl storage::Metadata for OpendalMetadata {
87 fn len(&self) -> u64 {
88 self.0.content_length()
89 }
90
91 fn is_dir(&self) -> bool {
92 self.0.is_dir()
93 }
94
95 fn is_file(&self) -> bool {
96 self.0.is_file()
97 }
98
99 fn is_symlink(&self) -> bool {
100 false
101 }
102
103 fn modified(&self) -> storage::Result<std::time::SystemTime> {
104 self.0.last_modified().map(Into::into).ok_or_else(|| {
105 storage::Error::new(storage::ErrorKind::LocalError, "no last modified time")
106 })
107 }
108
109 fn gid(&self) -> u32 {
110 0
111 }
112
113 fn uid(&self) -> u32 {
114 0
115 }
116}
117
118fn convert_err(err: opendal::Error) -> storage::Error {
119 let kind = match err.kind() {
120 opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
121 opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
122 opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
123 _ => storage::ErrorKind::LocalError,
124 };
125 storage::Error::new(kind, err)
126}
127
128fn convert_path(path: &Path) -> storage::Result<&str> {
129 path.to_str().ok_or_else(|| {
130 storage::Error::new(
131 storage::ErrorKind::LocalError,
132 "Path is not a valid UTF-8 string",
133 )
134 })
135}
136
137#[async_trait::async_trait]
138impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
139 type Metadata = OpendalMetadata;
140
141 async fn metadata<P: AsRef<Path> + Send + Debug>(
142 &self,
143 _: &User,
144 path: P,
145 ) -> storage::Result<Self::Metadata> {
146 let metadata = self
147 .op
148 .stat(convert_path(path.as_ref())?)
149 .await
150 .map_err(convert_err)?;
151 Ok(OpendalMetadata(metadata))
152 }
153
154 async fn list<P: AsRef<Path> + Send + Debug>(
155 &self,
156 _: &User,
157 path: P,
158 ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
159 where
160 Self::Metadata: storage::Metadata,
161 {
162 let ret = self
163 .op
164 .list(convert_path(path.as_ref())?)
165 .await
166 .map_err(convert_err)?
167 .into_iter()
168 .map(|x| {
169 let (path, metadata) = x.into_parts();
170 storage::Fileinfo {
171 path: path.into(),
172 metadata: OpendalMetadata(metadata),
173 }
174 })
175 .collect();
176 Ok(ret)
177 }
178
179 async fn get<P: AsRef<Path> + Send + Debug>(
180 &self,
181 _: &User,
182 path: P,
183 start_pos: u64,
184 ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
185 let reader = self
186 .op
187 .reader(convert_path(path.as_ref())?)
188 .await
189 .map_err(convert_err)?
190 .into_futures_async_read(start_pos..)
191 .await
192 .map_err(convert_err)?
193 .compat();
194 Ok(Box::new(reader))
195 }
196
197 async fn put<
198 P: AsRef<Path> + Send + Debug,
199 R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
200 >(
201 &self,
202 _: &User,
203 mut input: R,
204 path: P,
205 _: u64,
206 ) -> storage::Result<u64> {
207 let mut w = self
208 .op
209 .writer(convert_path(path.as_ref())?)
210 .await
211 .map_err(convert_err)?
212 .into_futures_async_write()
213 .compat_write();
214 let copy_result = tokio::io::copy(&mut input, &mut w).await;
215 let shutdown_result = w.shutdown().await;
216 match (copy_result, shutdown_result) {
217 (Ok(len), Ok(())) => Ok(len),
218 (Err(copy_err), Ok(())) => Err(Error::new(
219 storage::ErrorKind::LocalError,
220 format!("Failed to copy data: {}", copy_err),
221 )),
222 (Ok(_), Err(shutdown_err)) => Err(Error::new(
223 storage::ErrorKind::LocalError,
224 format!("Failed to shutdown writer: {}", shutdown_err),
225 )),
226 (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
227 storage::ErrorKind::LocalError,
228 format!(
229 "Failed to copy data: {} AND failed to shutdown writer: {}",
230 copy_err, shutdown_err
231 ),
232 )),
233 }
234 }
235
236 async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
237 self.op
238 .delete(convert_path(path.as_ref())?)
239 .await
240 .map_err(convert_err)
241 }
242
243 async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
244 let mut path_str = convert_path(path.as_ref())?.to_string();
245 if !path_str.ends_with('/') {
246 path_str.push('/');
247 }
248 self.op.create_dir(&path_str).await.map_err(convert_err)
249 }
250
251 async fn rename<P: AsRef<Path> + Send + Debug>(
252 &self,
253 _: &User,
254 from: P,
255 to: P,
256 ) -> storage::Result<()> {
257 let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
258 self.op.rename(from, to).await.map_err(convert_err)
259 }
260
261 async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
262 self.op
263 .remove_all(convert_path(path.as_ref())?)
264 .await
265 .map_err(convert_err)
266 }
267
268 async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
269 use opendal::ErrorKind::*;
270
271 match self.op.stat(convert_path(path.as_ref())?).await {
272 Ok(_) => Ok(()),
273 Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(storage::Error::new(
274 storage::ErrorKind::PermanentDirectoryNotAvailable,
275 e,
276 )),
277 Err(e) => Err(convert_err(e)),
278 }
279 }
280}