1use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio;
24use crate::raw::{
25 Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList, OpPresign, OpRead, OpStat,
26 OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
27};
28use crate::*;
29
30pub struct CompleteLayer;
32
33impl<A: Access> Layer<A> for CompleteLayer {
34 type LayeredAccess = CompleteAccessor<A>;
35
36 fn layer(&self, inner: A) -> Self::LayeredAccess {
37 CompleteAccessor {
38 info: inner.info(),
39 inner: Arc::new(inner),
40 }
41 }
42}
43
44pub struct CompleteAccessor<A: Access> {
46 info: Arc<AccessorInfo>,
47 inner: Arc<A>,
48}
49
50impl<A: Access> Debug for CompleteAccessor<A> {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 self.inner.fmt(f)
53 }
54}
55
56impl<A: Access> LayeredAccess for CompleteAccessor<A> {
57 type Inner = A;
58 type Reader = CompleteReader<A::Reader>;
59 type Writer = CompleteWriter<A::Writer>;
60 type Lister = A::Lister;
61 type Deleter = A::Deleter;
62
63 fn inner(&self) -> &Self::Inner {
64 &self.inner
65 }
66
67 fn info(&self) -> Arc<AccessorInfo> {
68 self.info.clone()
69 }
70
71 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
72 self.inner().create_dir(path, args).await
73 }
74
75 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
76 let size = args.range().size();
77 self.inner
78 .read(path, args)
79 .await
80 .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
81 }
82
83 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
84 let (rp, w) = self.inner.write(path, args.clone()).await?;
85 let w = CompleteWriter::new(w, args.append());
86 Ok((rp, w))
87 }
88
89 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
90 self.inner.stat(path, args).await
91 }
92
93 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
94 self.inner().delete().await
95 }
96
97 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
98 self.inner.list(path, args).await
99 }
100
101 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
102 self.inner.presign(path, args).await
103 }
104}
105
106pub struct CompleteReader<R> {
107 inner: R,
108 size: Option<u64>,
109 read: u64,
110}
111
112impl<R> CompleteReader<R> {
113 pub fn new(inner: R, size: Option<u64>) -> Self {
114 Self {
115 inner,
116 size,
117 read: 0,
118 }
119 }
120
121 pub fn check(&self) -> Result<()> {
122 let Some(size) = self.size else {
123 return Ok(());
124 };
125
126 match self.read.cmp(&size) {
127 Ordering::Equal => Ok(()),
128 Ordering::Less => Err(
129 Error::new(ErrorKind::Unexpected, "reader got too little data")
130 .with_context("expect", size)
131 .with_context("actual", self.read),
132 ),
133 Ordering::Greater => Err(
134 Error::new(ErrorKind::Unexpected, "reader got too much data")
135 .with_context("expect", size)
136 .with_context("actual", self.read),
137 ),
138 }
139 }
140}
141
142impl<R: oio::Read> oio::Read for CompleteReader<R> {
143 async fn read(&mut self) -> Result<Buffer> {
144 let buf = self.inner.read().await?;
145
146 if buf.is_empty() {
147 self.check()?;
148 } else {
149 self.read += buf.len() as u64;
150 }
151
152 Ok(buf)
153 }
154}
155
156pub struct CompleteWriter<W> {
157 inner: Option<W>,
158 append: bool,
159 size: u64,
160}
161
162impl<W> CompleteWriter<W> {
163 pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
164 CompleteWriter {
165 inner: Some(inner),
166 append,
167 size: 0,
168 }
169 }
170
171 fn check(&self, content_length: u64) -> Result<()> {
172 if self.append || content_length == 0 {
173 return Ok(());
174 }
175
176 match self.size.cmp(&content_length) {
177 Ordering::Equal => Ok(()),
178 Ordering::Less => Err(
179 Error::new(ErrorKind::Unexpected, "writer got too little data")
180 .with_context("expect", content_length)
181 .with_context("actual", self.size),
182 ),
183 Ordering::Greater => Err(
184 Error::new(ErrorKind::Unexpected, "writer got too much data")
185 .with_context("expect", content_length)
186 .with_context("actual", self.size),
187 ),
188 }
189 }
190}
191
192#[cfg(debug_assertions)]
195impl<W> Drop for CompleteWriter<W> {
196 fn drop(&mut self) {
197 if self.inner.is_some() {
198 log::warn!("writer has not been closed or aborted, must be a bug")
199 }
200 }
201}
202
203impl<W> oio::Write for CompleteWriter<W>
204where
205 W: oio::Write,
206{
207 async fn write(&mut self, bs: Buffer) -> Result<()> {
208 let w = self.inner.as_mut().ok_or_else(|| {
209 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
210 })?;
211
212 let len = bs.len();
213 w.write(bs).await?;
214 self.size += len as u64;
215
216 Ok(())
217 }
218
219 async fn close(&mut self) -> Result<Metadata> {
220 let w = self.inner.as_mut().ok_or_else(|| {
221 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
222 })?;
223
224 let mut ret = w.close().await?;
227 self.check(ret.content_length())?;
228 if ret.content_length() == 0 {
229 ret = ret.with_content_length(self.size);
230 }
231 self.inner = None;
232
233 Ok(ret)
234 }
235
236 async fn abort(&mut self) -> Result<()> {
237 let w = self.inner.as_mut().ok_or_else(|| {
238 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
239 })?;
240
241 w.abort().await?;
242 self.inner = None;
243
244 Ok(())
245 }
246}