1use crate::raw::oio::FlatLister;
19use crate::raw::oio::PrefixLister;
20use crate::raw::*;
21use crate::*;
22use std::cmp::Ordering;
23use std::fmt::Debug;
24use std::fmt::Formatter;
25use std::sync::Arc;
26
27pub struct CompleteLayer;
101
102impl<A: Access> Layer<A> for CompleteLayer {
103 type LayeredAccess = CompleteAccessor<A>;
104
105 fn layer(&self, inner: A) -> Self::LayeredAccess {
106 let info = inner.info();
107 info.update_full_capability(|mut cap| {
108 if cap.list && cap.write_can_empty {
109 cap.create_dir = true;
110 }
111 cap
112 });
113
114 CompleteAccessor {
115 info,
116 inner: Arc::new(inner),
117 }
118 }
119}
120
121pub struct CompleteAccessor<A: Access> {
123 info: Arc<AccessorInfo>,
124 inner: Arc<A>,
125}
126
127impl<A: Access> Debug for CompleteAccessor<A> {
128 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129 self.inner.fmt(f)
130 }
131}
132
133impl<A: Access> CompleteAccessor<A> {
134 async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
135 let capability = self.info.native_capability();
136 if capability.create_dir {
137 return self.inner().create_dir(path, args).await;
138 }
139
140 if capability.write_can_empty && capability.list {
141 let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
142 oio::Write::close(&mut w).await?;
143 return Ok(RpCreateDir::default());
144 }
145
146 self.inner.create_dir(path, args).await
147 }
148
149 fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
150 let capability = self.info.native_capability();
151 if capability.create_dir && capability.blocking {
152 return self.inner().blocking_create_dir(path, args);
153 }
154
155 if capability.write_can_empty && capability.list && capability.blocking {
156 let (_, mut w) = self.inner.blocking_write(path, OpWrite::default())?;
157 oio::BlockingWrite::close(&mut w)?;
158 return Ok(RpCreateDir::default());
159 }
160
161 self.inner.blocking_create_dir(path, args)
162 }
163
164 async fn complete_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
165 let capability = self.info.native_capability();
166
167 if path == "/" {
168 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
169 }
170
171 if path.ends_with('/') && capability.create_dir {
173 let meta = self.inner.stat(path, args).await?.into_metadata();
174
175 if meta.is_file() {
176 return Err(Error::new(
177 ErrorKind::NotFound,
178 "stat expected a directory, but found a file",
179 ));
180 }
181
182 return Ok(RpStat::new(meta));
183 }
184
185 if path.ends_with('/') && capability.list_with_recursive {
187 let (_, mut l) = self
188 .inner
189 .list(path, OpList::default().with_recursive(true).with_limit(1))
190 .await?;
191
192 return if oio::List::next(&mut l).await?.is_some() {
193 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
194 } else {
195 Err(Error::new(
196 ErrorKind::NotFound,
197 "the directory is not found",
198 ))
199 };
200 }
201
202 self.inner.stat(path, args).await
204 }
205
206 fn complete_blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
207 let capability = self.info.native_capability();
208
209 if path == "/" {
210 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
211 }
212
213 if path.ends_with('/') && capability.create_dir {
215 let meta = self.inner.blocking_stat(path, args)?.into_metadata();
216
217 if meta.is_file() {
218 return Err(Error::new(
219 ErrorKind::NotFound,
220 "stat expected a directory, but found a file",
221 ));
222 }
223
224 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
225 }
226
227 if path.ends_with('/') && capability.list_with_recursive {
229 let (_, mut l) = self
230 .inner
231 .blocking_list(path, OpList::default().with_recursive(true).with_limit(1))?;
232
233 return if oio::BlockingList::next(&mut l)?.is_some() {
234 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
235 } else {
236 Err(Error::new(
237 ErrorKind::NotFound,
238 "the directory is not found",
239 ))
240 };
241 }
242
243 self.inner.blocking_stat(path, args)
245 }
246
247 async fn complete_list(
248 &self,
249 path: &str,
250 args: OpList,
251 ) -> Result<(RpList, CompleteLister<A, A::Lister>)> {
252 let cap = self.info.native_capability();
253
254 let recursive = args.recursive();
255
256 match (recursive, cap.list_with_recursive) {
257 (_, true) => {
259 let (rp, p) = self.inner.list(path, args).await?;
260 Ok((rp, CompleteLister::One(p)))
261 }
262 (true, false) => {
264 if path.ends_with('/') {
266 let p = FlatLister::new(self.inner.clone(), path);
267 Ok((RpList::default(), CompleteLister::Two(p)))
268 } else {
269 let parent = get_parent(path);
270 let p = FlatLister::new(self.inner.clone(), parent);
271 let p = PrefixLister::new(p, path);
272 Ok((RpList::default(), CompleteLister::Four(p)))
273 }
274 }
275 (false, false) => {
278 if path.ends_with('/') {
280 let (rp, p) = self.inner.list(path, args).await?;
281 Ok((rp, CompleteLister::One(p)))
282 } else {
283 let parent = get_parent(path);
284 let (rp, p) = self.inner.list(parent, args).await?;
285 let p = PrefixLister::new(p, path);
286 Ok((rp, CompleteLister::Three(p)))
287 }
288 }
289 }
290 }
291
292 fn complete_blocking_list(
293 &self,
294 path: &str,
295 args: OpList,
296 ) -> Result<(RpList, CompleteLister<A, A::BlockingLister>)> {
297 let cap = self.info.native_capability();
298
299 let recursive = args.recursive();
300
301 match (recursive, cap.list_with_recursive) {
302 (_, true) => {
304 let (rp, p) = self.inner.blocking_list(path, args)?;
305 Ok((rp, CompleteLister::One(p)))
306 }
307 (true, false) => {
309 if path.ends_with('/') {
311 let p = FlatLister::new(self.inner.clone(), path);
312 Ok((RpList::default(), CompleteLister::Two(p)))
313 } else {
314 let parent = get_parent(path);
315 let p = FlatLister::new(self.inner.clone(), parent);
316 let p = PrefixLister::new(p, path);
317 Ok((RpList::default(), CompleteLister::Four(p)))
318 }
319 }
320 (false, false) => {
323 if path.ends_with('/') {
325 let (rp, p) = self.inner.blocking_list(path, args)?;
326 Ok((rp, CompleteLister::One(p)))
327 } else {
328 let parent = get_parent(path);
329 let (rp, p) = self.inner.blocking_list(parent, args)?;
330 let p = PrefixLister::new(p, path);
331 Ok((rp, CompleteLister::Three(p)))
332 }
333 }
334 }
335 }
336}
337
338impl<A: Access> LayeredAccess for CompleteAccessor<A> {
339 type Inner = A;
340 type Reader = CompleteReader<A::Reader>;
341 type BlockingReader = CompleteReader<A::BlockingReader>;
342 type Writer = CompleteWriter<A::Writer>;
343 type BlockingWriter = CompleteWriter<A::BlockingWriter>;
344 type Lister = CompleteLister<A, A::Lister>;
345 type BlockingLister = CompleteLister<A, A::BlockingLister>;
346 type Deleter = A::Deleter;
347 type BlockingDeleter = A::BlockingDeleter;
348
349 fn inner(&self) -> &Self::Inner {
350 &self.inner
351 }
352
353 fn info(&self) -> Arc<AccessorInfo> {
354 self.info.clone()
355 }
356
357 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
358 self.complete_create_dir(path, args).await
359 }
360
361 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
362 let size = args.range().size();
363 self.inner
364 .read(path, args)
365 .await
366 .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
367 }
368
369 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
370 let (rp, w) = self.inner.write(path, args.clone()).await?;
371 let w = CompleteWriter::new(w, args.append());
372 Ok((rp, w))
373 }
374
375 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
376 self.complete_stat(path, args).await
377 }
378
379 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
380 self.inner().delete().await
381 }
382
383 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
384 self.complete_list(path, args).await
385 }
386
387 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
388 self.inner.presign(path, args).await
389 }
390
391 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
392 self.complete_blocking_create_dir(path, args)
393 }
394
395 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
396 let size = args.range().size();
397 self.inner
398 .blocking_read(path, args)
399 .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
400 }
401
402 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
403 let append = args.append();
404 self.inner
405 .blocking_write(path, args)
406 .map(|(rp, w)| (rp, CompleteWriter::new(w, append)))
407 }
408
409 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
410 self.complete_blocking_stat(path, args)
411 }
412
413 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
414 self.inner().blocking_delete()
415 }
416
417 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
418 self.complete_blocking_list(path, args)
419 }
420}
421
422pub type CompleteLister<A, P> =
423 FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, PrefixLister<FlatLister<Arc<A>, P>>>;
424
425pub struct CompleteReader<R> {
426 inner: R,
427 size: Option<u64>,
428 read: u64,
429}
430
431impl<R> CompleteReader<R> {
432 pub fn new(inner: R, size: Option<u64>) -> Self {
433 Self {
434 inner,
435 size,
436 read: 0,
437 }
438 }
439
440 pub fn check(&self) -> Result<()> {
441 let Some(size) = self.size else {
442 return Ok(());
443 };
444
445 match self.read.cmp(&size) {
446 Ordering::Equal => Ok(()),
447 Ordering::Less => Err(
448 Error::new(ErrorKind::Unexpected, "reader got too little data")
449 .with_context("expect", size)
450 .with_context("actual", self.read),
451 ),
452 Ordering::Greater => Err(
453 Error::new(ErrorKind::Unexpected, "reader got too much data")
454 .with_context("expect", size)
455 .with_context("actual", self.read),
456 ),
457 }
458 }
459}
460
461impl<R: oio::Read> oio::Read for CompleteReader<R> {
462 async fn read(&mut self) -> Result<Buffer> {
463 let buf = self.inner.read().await?;
464
465 if buf.is_empty() {
466 self.check()?;
467 } else {
468 self.read += buf.len() as u64;
469 }
470
471 Ok(buf)
472 }
473}
474
475impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
476 fn read(&mut self) -> Result<Buffer> {
477 let buf = self.inner.read()?;
478
479 if buf.is_empty() {
480 self.check()?;
481 } else {
482 self.read += buf.len() as u64;
483 }
484
485 Ok(buf)
486 }
487}
488
489pub struct CompleteWriter<W> {
490 inner: Option<W>,
491 append: bool,
492 size: u64,
493}
494
495impl<W> CompleteWriter<W> {
496 pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
497 CompleteWriter {
498 inner: Some(inner),
499 append,
500 size: 0,
501 }
502 }
503
504 fn check(&self, content_length: u64) -> Result<()> {
505 if self.append || content_length == 0 {
506 return Ok(());
507 }
508
509 match self.size.cmp(&content_length) {
510 Ordering::Equal => Ok(()),
511 Ordering::Less => Err(
512 Error::new(ErrorKind::Unexpected, "writer got too little data")
513 .with_context("expect", content_length)
514 .with_context("actual", self.size),
515 ),
516 Ordering::Greater => Err(
517 Error::new(ErrorKind::Unexpected, "writer got too much data")
518 .with_context("expect", content_length)
519 .with_context("actual", self.size),
520 ),
521 }
522 }
523}
524
525#[cfg(debug_assertions)]
528impl<W> Drop for CompleteWriter<W> {
529 fn drop(&mut self) {
530 if self.inner.is_some() {
531 log::warn!("writer has not been closed or aborted, must be a bug")
532 }
533 }
534}
535
536impl<W> oio::Write for CompleteWriter<W>
537where
538 W: oio::Write,
539{
540 async fn write(&mut self, bs: Buffer) -> Result<()> {
541 let w = self.inner.as_mut().ok_or_else(|| {
542 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
543 })?;
544
545 let len = bs.len();
546 w.write(bs).await?;
547 self.size += len as u64;
548
549 Ok(())
550 }
551
552 async fn close(&mut self) -> Result<Metadata> {
553 let w = self.inner.as_mut().ok_or_else(|| {
554 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
555 })?;
556
557 let mut ret = w.close().await?;
560 self.check(ret.content_length())?;
561 if ret.content_length() == 0 {
562 ret = ret.with_content_length(self.size);
563 }
564 self.inner = None;
565
566 Ok(ret)
567 }
568
569 async fn abort(&mut self) -> Result<()> {
570 let w = self.inner.as_mut().ok_or_else(|| {
571 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
572 })?;
573
574 w.abort().await?;
575 self.inner = None;
576
577 Ok(())
578 }
579}
580
581impl<W> oio::BlockingWrite for CompleteWriter<W>
582where
583 W: oio::BlockingWrite,
584{
585 fn write(&mut self, bs: Buffer) -> Result<()> {
586 let w = self.inner.as_mut().ok_or_else(|| {
587 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
588 })?;
589
590 let len = bs.len();
591 w.write(bs)?;
592 self.size += len as u64;
593
594 Ok(())
595 }
596
597 fn close(&mut self) -> Result<Metadata> {
598 let w = self.inner.as_mut().ok_or_else(|| {
599 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
600 })?;
601
602 let mut ret = w.close()?;
603 self.check(ret.content_length())?;
604 if ret.content_length() == 0 {
605 ret = ret.with_content_length(self.size);
606 }
607 self.inner = None;
608
609 Ok(ret)
610 }
611}