1use std::fmt::Debug;
61use std::path::{Path, PathBuf};
62
63use libunftp::auth::UserDetail;
64use libunftp::storage::{self, Error, StorageBackend};
65use opendal::Operator;
66
67use tokio::io::AsyncWriteExt;
68use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
69
70#[derive(Debug, Clone)]
71pub struct OpendalStorage {
72 op: Operator,
73}
74
75impl OpendalStorage {
76 pub fn new(op: Operator) -> Self {
77 Self { op }
78 }
79}
80
81#[derive(Debug, Clone, Eq, PartialEq)]
83pub struct OpendalMetadata(opendal::Metadata);
84
85impl storage::Metadata for OpendalMetadata {
86 fn len(&self) -> u64 {
87 self.0.content_length()
88 }
89
90 fn is_dir(&self) -> bool {
91 self.0.is_dir()
92 }
93
94 fn is_file(&self) -> bool {
95 self.0.is_file()
96 }
97
98 fn is_symlink(&self) -> bool {
99 false
100 }
101
102 fn modified(&self) -> storage::Result<std::time::SystemTime> {
103 match self.0.last_modified() {
104 Some(ts) => Ok(ts.into()),
105 None => Err(Error::new(
106 storage::ErrorKind::LocalError,
107 "no last modified time",
108 )),
109 }
110 }
111
112 fn gid(&self) -> u32 {
113 0
114 }
115
116 fn uid(&self) -> u32 {
117 0
118 }
119}
120
121fn convert_err(err: opendal::Error) -> Error {
122 let kind = match err.kind() {
123 opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
124 opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
125 opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
126 _ => storage::ErrorKind::LocalError,
127 };
128 Error::new(kind, err)
129}
130
131fn convert_path(path: &Path) -> storage::Result<&str> {
132 path.to_str().ok_or_else(|| {
133 Error::new(
134 storage::ErrorKind::LocalError,
135 "Path is not a valid UTF-8 string",
136 )
137 })
138}
139
140#[async_trait::async_trait]
141impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
142 type Metadata = OpendalMetadata;
143
144 async fn metadata<P: AsRef<Path> + Send + Debug>(
145 &self,
146 _: &User,
147 path: P,
148 ) -> storage::Result<Self::Metadata> {
149 let metadata = self
150 .op
151 .stat(convert_path(path.as_ref())?)
152 .await
153 .map_err(convert_err)?;
154 Ok(OpendalMetadata(metadata))
155 }
156
157 async fn list<P: AsRef<Path> + Send + Debug>(
158 &self,
159 _: &User,
160 path: P,
161 ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
162 where
163 Self::Metadata: storage::Metadata,
164 {
165 let ret = self
166 .op
167 .list(convert_path(path.as_ref())?)
168 .await
169 .map_err(convert_err)?
170 .into_iter()
171 .map(|x| {
172 let (path, metadata) = x.into_parts();
173 storage::Fileinfo {
174 path: path.into(),
175 metadata: OpendalMetadata(metadata),
176 }
177 })
178 .collect();
179 Ok(ret)
180 }
181
182 async fn get<P: AsRef<Path> + Send + Debug>(
183 &self,
184 _: &User,
185 path: P,
186 start_pos: u64,
187 ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
188 let reader = self
189 .op
190 .reader(convert_path(path.as_ref())?)
191 .await
192 .map_err(convert_err)?
193 .into_futures_async_read(start_pos..)
194 .await
195 .map_err(convert_err)?
196 .compat();
197 Ok(Box::new(reader))
198 }
199
200 async fn put<
201 P: AsRef<Path> + Send + Debug,
202 R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
203 >(
204 &self,
205 _: &User,
206 mut input: R,
207 path: P,
208 _: u64,
209 ) -> storage::Result<u64> {
210 let mut w = self
211 .op
212 .writer(convert_path(path.as_ref())?)
213 .await
214 .map_err(convert_err)?
215 .into_futures_async_write()
216 .compat_write();
217 let copy_result = tokio::io::copy(&mut input, &mut w).await;
218 let shutdown_result = w.shutdown().await;
219 match (copy_result, shutdown_result) {
220 (Ok(len), Ok(())) => Ok(len),
221 (Err(copy_err), Ok(())) => Err(Error::new(
222 storage::ErrorKind::LocalError,
223 format!("Failed to copy data: {}", copy_err),
224 )),
225 (Ok(_), Err(shutdown_err)) => Err(Error::new(
226 storage::ErrorKind::LocalError,
227 format!("Failed to shutdown writer: {}", shutdown_err),
228 )),
229 (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
230 storage::ErrorKind::LocalError,
231 format!(
232 "Failed to copy data: {} AND failed to shutdown writer: {}",
233 copy_err, shutdown_err
234 ),
235 )),
236 }
237 }
238
239 async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
240 self.op
241 .delete(convert_path(path.as_ref())?)
242 .await
243 .map_err(convert_err)
244 }
245
246 async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
247 let mut path_str = convert_path(path.as_ref())?.to_string();
248 if !path_str.ends_with('/') {
249 path_str.push('/');
250 }
251 self.op.create_dir(&path_str).await.map_err(convert_err)
252 }
253
254 async fn rename<P: AsRef<Path> + Send + Debug>(
255 &self,
256 _: &User,
257 from: P,
258 to: P,
259 ) -> storage::Result<()> {
260 let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
261 self.op.rename(from, to).await.map_err(convert_err)
262 }
263
264 async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
265 self.op
266 .remove_all(convert_path(path.as_ref())?)
267 .await
268 .map_err(convert_err)
269 }
270
271 async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
272 use opendal::ErrorKind::*;
273
274 match self.op.stat(convert_path(path.as_ref())?).await {
275 Ok(_) => Ok(()),
276 Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(Error::new(
277 storage::ErrorKind::PermanentDirectoryNotAvailable,
278 e,
279 )),
280 Err(e) => Err(convert_err(e)),
281 }
282 }
283}