1use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio::FlatLister;
24use crate::raw::oio::PrefixLister;
25use crate::raw::*;
26use crate::*;
27
28pub struct CompleteLayer;
56
57impl<A: Access> Layer<A> for CompleteLayer {
58 type LayeredAccess = CompleteAccessor<A>;
59
60 fn layer(&self, inner: A) -> Self::LayeredAccess {
61 let info = inner.info();
62 info.update_full_capability(|mut cap| {
63 if cap.list && cap.write_can_empty {
64 cap.create_dir = true;
65 }
66 cap
67 });
68
69 CompleteAccessor {
70 info,
71 inner: Arc::new(inner),
72 }
73 }
74}
75
76pub struct CompleteAccessor<A: Access> {
78 info: Arc<AccessorInfo>,
79 inner: Arc<A>,
80}
81
82impl<A: Access> Debug for CompleteAccessor<A> {
83 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84 self.inner.fmt(f)
85 }
86}
87
88impl<A: Access> CompleteAccessor<A> {
89 async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
90 let capability = self.info.native_capability();
91 if capability.create_dir {
92 return self.inner().create_dir(path, args).await;
93 }
94
95 if capability.write_can_empty && capability.list {
96 let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
97 oio::Write::close(&mut w).await?;
98 return Ok(RpCreateDir::default());
99 }
100
101 self.inner.create_dir(path, args).await
102 }
103
104 async fn complete_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
105 let capability = self.info.native_capability();
106
107 if path == "/" {
108 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
109 }
110
111 if path.ends_with('/') && capability.create_dir {
113 let meta = self.inner.stat(path, args).await?.into_metadata();
114
115 if meta.is_file() {
116 return Err(Error::new(
117 ErrorKind::NotFound,
118 "stat expected a directory, but found a file",
119 ));
120 }
121
122 return Ok(RpStat::new(meta));
123 }
124
125 if path.ends_with('/') && capability.list_with_recursive {
127 let (_, mut l) = self
128 .inner
129 .list(path, OpList::default().with_recursive(true).with_limit(1))
130 .await?;
131
132 return if oio::List::next(&mut l).await?.is_some() {
133 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
134 } else {
135 Err(Error::new(
136 ErrorKind::NotFound,
137 "the directory is not found",
138 ))
139 };
140 }
141
142 self.inner.stat(path, args).await
144 }
145
146 async fn complete_list(
147 &self,
148 path: &str,
149 args: OpList,
150 ) -> Result<(RpList, CompleteLister<A, A::Lister>)> {
151 let cap = self.info.native_capability();
152
153 let recursive = args.recursive();
154
155 match (recursive, cap.list_with_recursive) {
156 (_, true) => {
158 let (rp, p) = self.inner.list(path, args).await?;
159 Ok((rp, CompleteLister::One(p)))
160 }
161 (true, false) => {
163 if path.ends_with('/') {
165 let p = FlatLister::new(self.inner.clone(), path);
166 Ok((RpList::default(), CompleteLister::Two(p)))
167 } else {
168 let parent = get_parent(path);
169 let p = FlatLister::new(self.inner.clone(), parent);
170 let p = PrefixLister::new(p, path);
171 Ok((RpList::default(), CompleteLister::Four(p)))
172 }
173 }
174 (false, false) => {
177 if path.ends_with('/') {
179 let (rp, p) = self.inner.list(path, args).await?;
180 Ok((rp, CompleteLister::One(p)))
181 } else {
182 let parent = get_parent(path);
183 let (rp, p) = self.inner.list(parent, args).await?;
184 let p = PrefixLister::new(p, path);
185 Ok((rp, CompleteLister::Three(p)))
186 }
187 }
188 }
189 }
190}
191
192impl<A: Access> LayeredAccess for CompleteAccessor<A> {
193 type Inner = A;
194 type Reader = CompleteReader<A::Reader>;
195 type Writer = CompleteWriter<A::Writer>;
196 type Lister = CompleteLister<A, A::Lister>;
197 type Deleter = A::Deleter;
198
199 fn inner(&self) -> &Self::Inner {
200 &self.inner
201 }
202
203 fn info(&self) -> Arc<AccessorInfo> {
204 self.info.clone()
205 }
206
207 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
208 self.complete_create_dir(path, args).await
209 }
210
211 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
212 let size = args.range().size();
213 self.inner
214 .read(path, args)
215 .await
216 .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
217 }
218
219 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
220 let (rp, w) = self.inner.write(path, args.clone()).await?;
221 let w = CompleteWriter::new(w, args.append());
222 Ok((rp, w))
223 }
224
225 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
226 self.complete_stat(path, args).await
227 }
228
229 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
230 self.inner().delete().await
231 }
232
233 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
234 self.complete_list(path, args).await
235 }
236
237 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
238 self.inner.presign(path, args).await
239 }
240}
241
242pub type CompleteLister<A, P> =
243 FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, PrefixLister<FlatLister<Arc<A>, P>>>;
244
245pub struct CompleteReader<R> {
246 inner: R,
247 size: Option<u64>,
248 read: u64,
249}
250
251impl<R> CompleteReader<R> {
252 pub fn new(inner: R, size: Option<u64>) -> Self {
253 Self {
254 inner,
255 size,
256 read: 0,
257 }
258 }
259
260 pub fn check(&self) -> Result<()> {
261 let Some(size) = self.size else {
262 return Ok(());
263 };
264
265 match self.read.cmp(&size) {
266 Ordering::Equal => Ok(()),
267 Ordering::Less => Err(
268 Error::new(ErrorKind::Unexpected, "reader got too little data")
269 .with_context("expect", size)
270 .with_context("actual", self.read),
271 ),
272 Ordering::Greater => Err(
273 Error::new(ErrorKind::Unexpected, "reader got too much data")
274 .with_context("expect", size)
275 .with_context("actual", self.read),
276 ),
277 }
278 }
279}
280
281impl<R: oio::Read> oio::Read for CompleteReader<R> {
282 async fn read(&mut self) -> Result<Buffer> {
283 let buf = self.inner.read().await?;
284
285 if buf.is_empty() {
286 self.check()?;
287 } else {
288 self.read += buf.len() as u64;
289 }
290
291 Ok(buf)
292 }
293}
294
295pub struct CompleteWriter<W> {
296 inner: Option<W>,
297 append: bool,
298 size: u64,
299}
300
301impl<W> CompleteWriter<W> {
302 pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
303 CompleteWriter {
304 inner: Some(inner),
305 append,
306 size: 0,
307 }
308 }
309
310 fn check(&self, content_length: u64) -> Result<()> {
311 if self.append || content_length == 0 {
312 return Ok(());
313 }
314
315 match self.size.cmp(&content_length) {
316 Ordering::Equal => Ok(()),
317 Ordering::Less => Err(
318 Error::new(ErrorKind::Unexpected, "writer got too little data")
319 .with_context("expect", content_length)
320 .with_context("actual", self.size),
321 ),
322 Ordering::Greater => Err(
323 Error::new(ErrorKind::Unexpected, "writer got too much data")
324 .with_context("expect", content_length)
325 .with_context("actual", self.size),
326 ),
327 }
328 }
329}
330
331#[cfg(debug_assertions)]
334impl<W> Drop for CompleteWriter<W> {
335 fn drop(&mut self) {
336 if self.inner.is_some() {
337 log::warn!("writer has not been closed or aborted, must be a bug")
338 }
339 }
340}
341
342impl<W> oio::Write for CompleteWriter<W>
343where
344 W: oio::Write,
345{
346 async fn write(&mut self, bs: Buffer) -> Result<()> {
347 let w = self.inner.as_mut().ok_or_else(|| {
348 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
349 })?;
350
351 let len = bs.len();
352 w.write(bs).await?;
353 self.size += len as u64;
354
355 Ok(())
356 }
357
358 async fn close(&mut self) -> Result<Metadata> {
359 let w = self.inner.as_mut().ok_or_else(|| {
360 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
361 })?;
362
363 let mut ret = w.close().await?;
366 self.check(ret.content_length())?;
367 if ret.content_length() == 0 {
368 ret = ret.with_content_length(self.size);
369 }
370 self.inner = None;
371
372 Ok(ret)
373 }
374
375 async fn abort(&mut self) -> Result<()> {
376 let w = self.inner.as_mut().ok_or_else(|| {
377 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
378 })?;
379
380 w.abort().await?;
381 self.inner = None;
382
383 Ok(())
384 }
385}