1use std::fmt::Debug;
62use std::path::{Path, PathBuf};
63
64use libunftp::auth::UserDetail;
65use libunftp::storage::{self, StorageBackend};
66use opendal::Operator;
67
68use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
69
70#[derive(Debug, Clone)]
71pub struct OpendalStorage {
72 op: Operator,
73}
74
75impl OpendalStorage {
76 pub fn new(op: Operator) -> Self {
77 Self { op }
78 }
79}
80
81#[derive(Debug, Clone, Eq, PartialEq)]
83pub struct OpendalMetadata(opendal::Metadata);
84
85impl storage::Metadata for OpendalMetadata {
86 fn len(&self) -> u64 {
87 self.0.content_length()
88 }
89
90 fn is_dir(&self) -> bool {
91 self.0.is_dir()
92 }
93
94 fn is_file(&self) -> bool {
95 self.0.is_file()
96 }
97
98 fn is_symlink(&self) -> bool {
99 false
100 }
101
102 fn modified(&self) -> storage::Result<std::time::SystemTime> {
103 self.0.last_modified().map(Into::into).ok_or_else(|| {
104 storage::Error::new(storage::ErrorKind::LocalError, "no last modified time")
105 })
106 }
107
108 fn gid(&self) -> u32 {
109 0
110 }
111
112 fn uid(&self) -> u32 {
113 0
114 }
115}
116
117fn convert_err(err: opendal::Error) -> storage::Error {
118 let kind = match err.kind() {
119 opendal::ErrorKind::NotFound => storage::ErrorKind::PermanentFileNotAvailable,
120 opendal::ErrorKind::AlreadyExists => storage::ErrorKind::PermanentFileNotAvailable,
121 opendal::ErrorKind::PermissionDenied => storage::ErrorKind::PermissionDenied,
122 _ => storage::ErrorKind::LocalError,
123 };
124 storage::Error::new(kind, err)
125}
126
127fn convert_path(path: &Path) -> storage::Result<&str> {
128 path.to_str().ok_or_else(|| {
129 storage::Error::new(
130 storage::ErrorKind::LocalError,
131 "Path is not a valid UTF-8 string",
132 )
133 })
134}
135
136#[async_trait::async_trait]
137impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
138 type Metadata = OpendalMetadata;
139
140 async fn metadata<P: AsRef<Path> + Send + Debug>(
141 &self,
142 _: &User,
143 path: P,
144 ) -> storage::Result<Self::Metadata> {
145 let metadata = self
146 .op
147 .stat(convert_path(path.as_ref())?)
148 .await
149 .map_err(convert_err)?;
150 Ok(OpendalMetadata(metadata))
151 }
152
153 async fn list<P: AsRef<Path> + Send + Debug>(
154 &self,
155 _: &User,
156 path: P,
157 ) -> storage::Result<Vec<storage::Fileinfo<PathBuf, Self::Metadata>>>
158 where
159 Self::Metadata: storage::Metadata,
160 {
161 let ret = self
162 .op
163 .list(convert_path(path.as_ref())?)
164 .await
165 .map_err(convert_err)?
166 .into_iter()
167 .map(|x| {
168 let (path, metadata) = x.into_parts();
169 storage::Fileinfo {
170 path: path.into(),
171 metadata: OpendalMetadata(metadata),
172 }
173 })
174 .collect();
175 Ok(ret)
176 }
177
178 async fn get<P: AsRef<Path> + Send + Debug>(
179 &self,
180 _: &User,
181 path: P,
182 start_pos: u64,
183 ) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
184 let reader = self
185 .op
186 .reader(convert_path(path.as_ref())?)
187 .await
188 .map_err(convert_err)?
189 .into_futures_async_read(start_pos..)
190 .await
191 .map_err(convert_err)?
192 .compat();
193 Ok(Box::new(reader))
194 }
195
196 async fn put<
197 P: AsRef<Path> + Send + Debug,
198 R: tokio::io::AsyncRead + Send + Sync + Unpin + 'static,
199 >(
200 &self,
201 _: &User,
202 mut input: R,
203 path: P,
204 _: u64,
205 ) -> storage::Result<u64> {
206 let mut w = self
207 .op
208 .writer(convert_path(path.as_ref())?)
209 .await
210 .map_err(convert_err)?
211 .into_futures_async_write()
212 .compat_write();
213 let len = tokio::io::copy(&mut input, &mut w).await?;
214 Ok(len)
215 }
216
217 async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
218 self.op
219 .delete(convert_path(path.as_ref())?)
220 .await
221 .map_err(convert_err)
222 }
223
224 async fn mkd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
225 let mut path_str = convert_path(path.as_ref())?.to_string();
226 if !path_str.ends_with('/') {
227 path_str.push('/');
228 }
229 self.op.create_dir(&path_str).await.map_err(convert_err)
230 }
231
232 async fn rename<P: AsRef<Path> + Send + Debug>(
233 &self,
234 _: &User,
235 from: P,
236 to: P,
237 ) -> storage::Result<()> {
238 let (from, to) = (convert_path(from.as_ref())?, convert_path(to.as_ref())?);
239 self.op.rename(from, to).await.map_err(convert_err)
240 }
241
242 async fn rmd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
243 self.op
244 .remove_all(convert_path(path.as_ref())?)
245 .await
246 .map_err(convert_err)
247 }
248
249 async fn cwd<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) -> storage::Result<()> {
250 use opendal::ErrorKind::*;
251
252 match self.op.stat(convert_path(path.as_ref())?).await {
253 Ok(_) => Ok(()),
254 Err(e) if matches!(e.kind(), NotFound | NotADirectory) => Err(storage::Error::new(
255 storage::ErrorKind::PermanentDirectoryNotAvailable,
256 e,
257 )),
258 Err(e) => Err(convert_err(e)),
259 }
260 }
261}