1use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio;
24use crate::raw::{
25 Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList, OpPresign, OpRead, OpStat,
26 OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
27};
28use crate::*;
29
30pub struct CompleteLayer;
32
33impl<A: Access> Layer<A> for CompleteLayer {
34 type LayeredAccess = CompleteAccessor<A>;
35
36 fn layer(&self, inner: A) -> Self::LayeredAccess {
37 CompleteAccessor {
38 info: inner.info(),
39 inner: Arc::new(inner),
40 }
41 }
42}
43
44pub struct CompleteAccessor<A: Access> {
46 info: Arc<AccessorInfo>,
47 inner: Arc<A>,
48}
49
50impl<A: Access> Debug for CompleteAccessor<A> {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 self.inner.fmt(f)
53 }
54}
55
56impl<A: Access> LayeredAccess for CompleteAccessor<A> {
57 type Inner = A;
58 type Reader = CompleteReader<A::Reader>;
59 type Writer = CompleteWriter<A::Writer>;
60 type Lister = A::Lister;
61 type Deleter = A::Deleter;
62
63 fn inner(&self) -> &Self::Inner {
64 &self.inner
65 }
66
67 fn info(&self) -> Arc<AccessorInfo> {
68 self.info.clone()
69 }
70
71 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
72 self.inner().create_dir(path, args).await
73 }
74
75 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
76 let size = args.range().size();
77 self.inner
78 .read(path, args)
79 .await
80 .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
81 }
82
83 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
84 let (rp, w) = self.inner.write(path, args.clone()).await?;
85 let w = CompleteWriter::new(w, args.append());
86 Ok((rp, w))
87 }
88
89 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
90 self.inner.stat(path, args).await
91 }
92
93 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
94 self.inner().delete().await
95 }
96
97 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
98 self.inner.list(path, args).await
99 }
100
101 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
102 self.inner.presign(path, args).await
103 }
104}
105
106pub struct CompleteReader<R> {
107 inner: R,
108 size: Option<u64>,
109 read: u64,
110}
111
112impl<R> CompleteReader<R> {
113 pub fn new(inner: R, size: Option<u64>) -> Self {
114 Self {
115 inner,
116 size,
117 read: 0,
118 }
119 }
120
121 pub fn check(&self) -> Result<()> {
122 let Some(size) = self.size else {
123 return Ok(());
124 };
125
126 match self.read.cmp(&size) {
127 Ordering::Equal => Ok(()),
128 Ordering::Less => Err(
129 Error::new(ErrorKind::Unexpected, "reader got too little data")
130 .with_context("expect", size)
131 .with_context("actual", self.read),
132 ),
133 Ordering::Greater => Err(
134 Error::new(ErrorKind::Unexpected, "reader got too much data")
135 .with_context("expect", size)
136 .with_context("actual", self.read),
137 ),
138 }
139 }
140}
141
142impl<R: oio::Read> oio::Read for CompleteReader<R> {
143 async fn read(&mut self) -> Result<Buffer> {
144 let buf = self.inner.read().await?;
145
146 if buf.is_empty() {
147 self.check()?;
148 } else {
149 self.read += buf.len() as u64;
150 }
151
152 Ok(buf)
153 }
154}
155
156#[derive(Debug, PartialEq, Eq)]
160enum CompleteState {
161 Open,
162 Written,
163 Closed,
164 Error,
165}
166
167impl CompleteState {
168 fn transition(&mut self, destination: CompleteState) {
171 if *self != CompleteState::Error {
172 *self = destination
173 }
174 }
175}
176
177pub struct CompleteWriter<W> {
178 inner: Option<W>,
179 append: bool,
180 size: u64,
181 state: CompleteState,
182}
183
184impl<W> CompleteWriter<W> {
185 pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
186 CompleteWriter {
187 inner: Some(inner),
188 append,
189 size: 0,
190 state: CompleteState::Open,
191 }
192 }
193
194 fn check(&self, content_length: u64) -> Result<()> {
195 if self.append || content_length == 0 {
196 return Ok(());
197 }
198
199 match self.size.cmp(&content_length) {
200 Ordering::Equal => Ok(()),
201 Ordering::Less => Err(
202 Error::new(ErrorKind::Unexpected, "writer got too little data")
203 .with_context("expect", content_length)
204 .with_context("actual", self.size),
205 ),
206 Ordering::Greater => Err(
207 Error::new(ErrorKind::Unexpected, "writer got too much data")
208 .with_context("expect", content_length)
209 .with_context("actual", self.size),
210 ),
211 }
212 }
213}
214
215#[cfg(debug_assertions)]
218impl<W> Drop for CompleteWriter<W> {
219 fn drop(&mut self) {
220 if self.state == CompleteState::Written {
221 log::warn!(
222 "writer has not been closed or aborted after successful write operation, must be a bug"
223 )
224 }
225 }
226}
227
228impl<W> oio::Write for CompleteWriter<W>
229where
230 W: oio::Write,
231{
232 async fn write(&mut self, bs: Buffer) -> Result<()> {
233 let w = self.inner.as_mut().ok_or_else(|| {
234 debug_assert_ne!(
235 self.state,
236 CompleteState::Open,
237 "bug: inner is empty, but state is Open"
238 );
239 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
240 })?;
241
242 let len = bs.len();
243 w.write(bs)
244 .await
245 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
246 self.size += len as u64;
247 self.state.transition(CompleteState::Written);
248
249 Ok(())
250 }
251
252 async fn close(&mut self) -> Result<Metadata> {
253 let w = self.inner.as_mut().ok_or_else(|| {
254 debug_assert_ne!(
255 self.state,
256 CompleteState::Open,
257 "bug: inner is empty, but state is Open"
258 );
259 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
260 })?;
261
262 let mut ret = w
265 .close()
266 .await
267 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
268 self.check(ret.content_length())
269 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
270 if ret.content_length() == 0 {
271 ret = ret.with_content_length(self.size);
272 }
273 self.inner = None;
274 self.state.transition(CompleteState::Closed);
275
276 Ok(ret)
277 }
278
279 async fn abort(&mut self) -> Result<()> {
280 let w = self.inner.as_mut().ok_or_else(|| {
281 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
282 })?;
283
284 w.abort()
285 .await
286 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
287 self.inner = None;
288 self.state.transition(CompleteState::Closed);
289
290 Ok(())
291 }
292}