1use std::fmt::Debug;
62use std::path::{Path, PathBuf};
63
64use libunftp::auth::UserDetail;
65use libunftp::storage::{self, Error, StorageBackend};
66use opendal::Operator;
67
68use tokio::io::AsyncWriteExt;
69use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
70
71#[derive(Debug, Clone)]
72pub struct OpendalStorage {
73 op: Operator,
74}
75
76impl OpendalStorage {
77 pub fn new(op: Operator) -> Self {
78 Self { op }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct OpendalMetadata(opendal::Metadata);
85
86impl storage::Metadata for OpendalMetadata {
87 fn len(&self) -> u64 {
88 self.0.content_length()
89 }
90
91 fn is_dir(&self) -> bool {
92 self.0.is_dir()
93 }
94
95 fn is_file(&self) -> bool {
96 self.0.is_file()
97 }
98
99 fn is_symlink(&self) -> bool {
100 false
101 }
102
103 fn modified(&self) -> storage::Result<std::time::SystemTime> {
104 match self.0.last_modified() {
105 Some(ts) => Ok(ts.into()),
106 None => Err(Error::new(
107 storage::ErrorKind::LocalError,
108 "no last modified time",
109 )),
110 }
111 }
112
113 fn gid(&self) -> u32 {
114 0
115 }
116
117 fn uid(&self) -> u32 {
118 0
119 }
120}
121
122fn convert_err(err: opendal::Error) -> Error {
123 let kind = match err.kind() {
124 opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
125 opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
126 opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
127 _ => storage::ErrorKind::LocalError,
128 };
129 Error::new(kind, err)
130}
131
132fn convert_path(path: &Path) -> storage::Result<&str> {
133 path.to_str().ok_or_else(|| {
134 Error::new(
135 storage::ErrorKind::LocalError,
136 "Path is not a valid UTF-8 string",
137 )
138 })
139}
140
141#[async_trait::async_trait]
142impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
143 type Metadata = OpendalMetadata;
144
145 async fn metadata<P: AsRef<Path> + Send + Debug>(
146 &self,
147 _: &User,
148 path: P,
149 ) -> storage::Result<Self::Metadata> {
150 let metadata = self
151 .op
152 .stat(convert_path(path.as_ref())?)
153 .await
154 .map_err(convert_err)?;
155 Ok(OpendalMetadata(metadata))
156 }
157
158 async fn list<P: AsRef<Path> + Send + Debug>(
159 &self,
160 _: &User,
161 path: P,
162 ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
163 where
164 Self::Metadata: storage::Metadata,
165 {
166 let ret = self
167 .op
168 .list(convert_path(path.as_ref())?)
169 .await
170 .map_err(convert_err)?
171 .into_iter()
172 .map(|x| {
173 let (path, metadata) = x.into_parts();
174 storage::Fileinfo {
175 path: path.into(),
176 metadata: OpendalMetadata(metadata),
177 }
178 })
179 .collect();
180 Ok(ret)
181 }
182
183 async fn get<P: AsRef<Path> + Send + Debug>(
184 &self,
185 _: &User,
186 path: P,
187 start_pos: u64,
188 ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
189 let reader = self
190 .op
191 .reader(convert_path(path.as_ref())?)
192 .await
193 .map_err(convert_err)?
194 .into_futures_async_read(start_pos..)
195 .await
196 .map_err(convert_err)?
197 .compat();
198 Ok(Box::new(reader))
199 }
200
201 async fn put<
202 P: AsRef<Path> + Send + Debug,
203 R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
204 >(
205 &self,
206 _: &User,
207 mut input: R,
208 path: P,
209 _: u64,
210 ) -> storage::Result<u64> {
211 let mut w = self
212 .op
213 .writer(convert_path(path.as_ref())?)
214 .await
215 .map_err(convert_err)?
216 .into_futures_async_write()
217 .compat_write();
218 let copy_result = tokio::io::copy(&mut input, &mut w).await;
219 let shutdown_result = w.shutdown().await;
220 match (copy_result, shutdown_result) {
221 (Ok(len), Ok(())) => Ok(len),
222 (Err(copy_err), Ok(())) => Err(Error::new(
223 storage::ErrorKind::LocalError,
224 format!("Failed to copy data: {}", copy_err),
225 )),
226 (Ok(_), Err(shutdown_err)) => Err(Error::new(
227 storage::ErrorKind::LocalError,
228 format!("Failed to shutdown writer: {}", shutdown_err),
229 )),
230 (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
231 storage::ErrorKind::LocalError,
232 format!(
233 "Failed to copy data: {} AND failed to shutdown writer: {}",
234 copy_err, shutdown_err
235 ),
236 )),
237 }
238 }
239
240 async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
241 self.op
242 .delete(convert_path(path.as_ref())?)
243 .await
244 .map_err(convert_err)
245 }
246
247 async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
248 let mut path_str = convert_path(path.as_ref())?.to_string();
249 if !path_str.ends_with('/') {
250 path_str.push('/');
251 }
252 self.op.create_dir(&path_str).await.map_err(convert_err)
253 }
254
255 async fn rename<P: AsRef<Path> + Send + Debug>(
256 &self,
257 _: &User,
258 from: P,
259 to: P,
260 ) -> storage::Result<()> {
261 let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
262 self.op.rename(from, to).await.map_err(convert_err)
263 }
264
265 async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
266 self.op
267 .remove_all(convert_path(path.as_ref())?)
268 .await
269 .map_err(convert_err)
270 }
271
272 async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
273 use opendal::ErrorKind::*;
274
275 match self.op.stat(convert_path(path.as_ref())?).await {
276 Ok(_) => Ok(()),
277 Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(Error::new(
278 storage::ErrorKind::PermanentDirectoryNotAvailable,
279 e,
280 )),
281 Err(e) => Err(convert_err(e)),
282 }
283 }
284}