1use std::fmt::Debug;
61use std::path::{Path, PathBuf};
62
63use opendal::Operator;
64use unftp_core::auth::UserDetail;
65use unftp_core::storage::{self, Error, StorageBackend};
66
67use tokio::io::AsyncReadExt;
68use tokio::io::AsyncWriteExt;
69use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
70
71#[derive(Debug, Clone)]
72pub struct OpendalStorage {
73 op: Operator,
74}
75
76impl OpendalStorage {
77 pub fn new(op: Operator) -> Self {
78 Self { op }
79 }
80}
81
82#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct OpendalMetadata(opendal::Metadata);
85
86impl storage::Metadata for OpendalMetadata {
87 fn len(&self) -> u64 {
88 self.0.content_length()
89 }
90
91 fn is_dir(&self) -> bool {
92 self.0.is_dir()
93 }
94
95 fn is_file(&self) -> bool {
96 self.0.is_file()
97 }
98
99 fn is_symlink(&self) -> bool {
100 false
101 }
102
103 fn modified(&self) -> storage::Result<std::time::SystemTime> {
104 match self.0.last_modified() {
105 Some(ts) => Ok(ts.into()),
106 None => Err(Error::new(
107 storage::ErrorKind::LocalError,
108 "no last modified time",
109 )),
110 }
111 }
112
113 fn gid(&self) -> u32 {
114 0
115 }
116
117 fn uid(&self) -> u32 {
118 0
119 }
120}
121
122fn convert_err(err: opendal::Error) -> Error {
123 let kind = match err.kind() {
124 opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
125 opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
126 opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
127 _ => storage::ErrorKind::LocalError,
128 };
129 Error::new(kind, err)
130}
131
132fn convert_path(path: &Path) -> storage::Result<&str> {
133 path.to_str().ok_or_else(|| {
134 Error::new(
135 storage::ErrorKind::LocalError,
136 "Path is not a valid UTF-8 string",
137 )
138 })
139}
140
141async fn copy_read_write_loop<R, W>(input: &mut R, output: &mut W) -> std::io::Result<u64>
142where
143 R: tokio::io::AsyncRead + Unpin + ?Sized,
144 W: tokio::io::AsyncWrite + Unpin + ?Sized,
145{
146 let mut copied = 0u64;
147 let mut buf = [0u8; 8 * 1024];
148
149 loop {
150 let n = input.read(&mut buf).await?;
151 if n == 0 {
152 return Ok(copied);
153 }
154
155 output.write_all(&buf[..n]).await?;
156 copied += n as u64;
157 }
158}
159
160#[async_trait::async_trait]
161impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
162 type Metadata = OpendalMetadata;
163
164 async fn metadata<P: AsRef<Path> + Send + Debug>(
165 &self,
166 _: &User,
167 path: P,
168 ) -> storage::Result<Self::Metadata> {
169 let metadata = self
170 .op
171 .stat(convert_path(path.as_ref())?)
172 .await
173 .map_err(convert_err)?;
174 Ok(OpendalMetadata(metadata))
175 }
176
177 async fn list<P: AsRef<Path> + Send + Debug>(
178 &self,
179 _: &User,
180 path: P,
181 ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
182 where
183 Self::Metadata: storage::Metadata,
184 {
185 let ret = self
186 .op
187 .list(convert_path(path.as_ref())?)
188 .await
189 .map_err(convert_err)?
190 .into_iter()
191 .map(|x| {
192 let (path, metadata) = x.into_parts();
193 storage::Fileinfo {
194 path: path.into(),
195 metadata: OpendalMetadata(metadata),
196 }
197 })
198 .collect();
199 Ok(ret)
200 }
201
202 async fn get<P: AsRef<Path> + Send + Debug>(
203 &self,
204 _: &User,
205 path: P,
206 start_pos: u64,
207 ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
208 let reader = self
209 .op
210 .reader(convert_path(path.as_ref())?)
211 .await
212 .map_err(convert_err)?
213 .into_futures_async_read(start_pos..)
214 .await
215 .map_err(convert_err)?
216 .compat();
217 Ok(Box::new(reader))
218 }
219
220 async fn put<
221 P: AsRef<Path> + Send + Debug,
222 R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
223 >(
224 &self,
225 _: &User,
226 mut input: R,
227 path: P,
228 _: u64,
229 ) -> storage::Result<u64> {
230 let mut w = self
231 .op
232 .writer(convert_path(path.as_ref())?)
233 .await
234 .map_err(convert_err)?
235 .into_futures_async_write()
236 .compat_write();
237 let copy_result = copy_read_write_loop(&mut input, &mut w).await;
239 let shutdown_result = w.shutdown().await;
240 match (copy_result, shutdown_result) {
241 (Ok(len), Ok(())) => Ok(len),
242 (Err(copy_err), Ok(())) => Err(Error::new(
243 storage::ErrorKind::LocalError,
244 format!("Failed to copy data: {}", copy_err),
245 )),
246 (Ok(_), Err(shutdown_err)) => Err(Error::new(
247 storage::ErrorKind::LocalError,
248 format!("Failed to shutdown writer: {}", shutdown_err),
249 )),
250 (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
251 storage::ErrorKind::LocalError,
252 format!(
253 "Failed to copy data: {} AND failed to shutdown writer: {}",
254 copy_err, shutdown_err
255 ),
256 )),
257 }
258 }
259
260 async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
261 self.op
262 .delete(convert_path(path.as_ref())?)
263 .await
264 .map_err(convert_err)
265 }
266
267 async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
268 let mut path_str = convert_path(path.as_ref())?.to_string();
269 if !path_str.ends_with('/') {
270 path_str.push('/');
271 }
272 self.op.create_dir(&path_str).await.map_err(convert_err)
273 }
274
275 async fn rename<P: AsRef<Path> + Send + Debug>(
276 &self,
277 _: &User,
278 from: P,
279 to: P,
280 ) -> storage::Result<()> {
281 let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
282 self.op.rename(from, to).await.map_err(convert_err)
283 }
284
285 async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
286 self.op
287 .delete_with(convert_path(path.as_ref())?)
288 .recursive(true)
289 .await
290 .map_err(convert_err)
291 }
292
293 async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
294 use opendal::ErrorKind::*;
295
296 match self.op.stat(convert_path(path.as_ref())?).await {
297 Ok(_) => Ok(()),
298 Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(Error::new(
299 storage::ErrorKind::PermanentDirectoryNotAvailable,
300 e,
301 )),
302 Err(e) => Err(convert_err(e)),
303 }
304 }
305}