1use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio;
24use crate::raw::{
25 Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList, OpPresign, OpRead, OpStat,
26 OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
27};
28use crate::*;
29
30pub struct CompleteLayer;
32
33impl<A: Access> Layer<A> for CompleteLayer {
34 type LayeredAccess = CompleteAccessor<A>;
35
36 fn layer(&self, inner: A) -> Self::LayeredAccess {
37 CompleteAccessor {
38 info: inner.info(),
39 inner: Arc::new(inner),
40 }
41 }
42}
43
44pub struct CompleteAccessor<A: Access> {
46 info: Arc<AccessorInfo>,
47 inner: Arc<A>,
48}
49
50impl<A: Access> Debug for CompleteAccessor<A> {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 self.inner.fmt(f)
53 }
54}
55
56impl<A: Access> LayeredAccess for CompleteAccessor<A> {
57 type Inner = A;
58 type Reader = CompleteReader<A::Reader>;
59 type Writer = CompleteWriter<A::Writer>;
60 type Lister = CompleteLister<A>;
61 type Deleter = A::Deleter;
62
63 fn inner(&self) -> &Self::Inner {
64 &self.inner
65 }
66
67 fn info(&self) -> Arc<AccessorInfo> {
68 self.info.clone()
69 }
70
71 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
72 self.inner().create_dir(path, args).await
73 }
74
75 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
76 let size = args.range().size();
77 self.inner
78 .read(path, args)
79 .await
80 .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
81 }
82
83 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
84 let (rp, w) = self.inner.write(path, args.clone()).await?;
85 let w = CompleteWriter::new(w, args.append());
86 Ok((rp, w))
87 }
88
89 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
90 self.inner.stat(path, args).await
91 }
92
93 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
94 self.inner().delete().await
95 }
96
97 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
98 let (rp, lister) = self.inner.list(path, args).await?;
99 let lister = CompleteLister::new(self.inner.clone(), self.info.clone(), lister);
100 Ok((rp, lister))
101 }
102
103 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
104 self.inner.presign(path, args).await
105 }
106}
107
108pub struct CompleteLister<A: Access> {
109 inner: A::Lister,
110 acc: Arc<A>,
111 info: Arc<AccessorInfo>,
112}
113
114impl<A: Access> CompleteLister<A> {
115 fn new(acc: Arc<A>, info: Arc<AccessorInfo>, inner: A::Lister) -> Self {
116 Self { inner, acc, info }
117 }
118
119 async fn ensure_file_content_length(&self, entry: &mut oio::Entry) -> Result<()> {
120 let path = entry.path().to_string();
121 let version = entry.metadata().version().map(str::to_owned);
122 let mut op = OpStat::new();
123 if let Some(version) = version.as_deref() {
124 op = op.with_version(version);
125 }
126
127 let stat_metadata = self.acc.stat(&path, op).await?.into_metadata();
128 if !stat_metadata.has_content_length() {
129 return Err(Error::new(
130 ErrorKind::Unexpected,
131 "content length is required for list file entries",
132 )
133 .with_operation("CompleteLister::ensure_file_content_length")
134 .with_context("service", self.info.scheme().to_string())
135 .with_context("path", path));
136 }
137
138 entry
139 .metadata_mut()
140 .set_content_length(stat_metadata.content_length());
141 Ok(())
142 }
143}
144
145impl<A: Access> oio::List for CompleteLister<A> {
146 async fn next(&mut self) -> Result<Option<oio::Entry>> {
147 loop {
148 let Some(mut entry) = self.inner.next().await? else {
149 return Ok(None);
150 };
151
152 if !entry.mode().is_file()
153 || entry.metadata().is_deleted()
154 || entry.metadata().has_content_length()
155 {
156 return Ok(Some(entry));
157 }
158
159 match self.ensure_file_content_length(&mut entry).await {
160 Ok(()) => return Ok(Some(entry)),
161 Err(err) if err.kind() == ErrorKind::NotFound => continue,
162 Err(err) => return Err(err),
163 }
164 }
165 }
166}
167
168pub struct CompleteReader<R> {
169 inner: R,
170 size: Option<u64>,
171 read: u64,
172}
173
174impl<R> CompleteReader<R> {
175 pub fn new(inner: R, size: Option<u64>) -> Self {
176 Self {
177 inner,
178 size,
179 read: 0,
180 }
181 }
182
183 pub fn check(&self) -> Result<()> {
184 let Some(size) = self.size else {
185 return Ok(());
186 };
187
188 match self.read.cmp(&size) {
189 Ordering::Equal => Ok(()),
190 Ordering::Less => Err(
191 Error::new(ErrorKind::Unexpected, "reader got too little data")
192 .with_context("expect", size)
193 .with_context("actual", self.read),
194 ),
195 Ordering::Greater => Err(
196 Error::new(ErrorKind::Unexpected, "reader got too much data")
197 .with_context("expect", size)
198 .with_context("actual", self.read),
199 ),
200 }
201 }
202}
203
204impl<R: oio::Read> oio::Read for CompleteReader<R> {
205 async fn read(&mut self) -> Result<Buffer> {
206 let buf = self.inner.read().await?;
207
208 if buf.is_empty() {
209 self.check()?;
210 } else {
211 self.read += buf.len() as u64;
212 }
213
214 Ok(buf)
215 }
216}
217
218#[derive(Debug, PartialEq, Eq)]
222enum CompleteState {
223 Open,
224 Written,
225 Closed,
226 Error,
227}
228
229impl CompleteState {
230 fn transition(&mut self, destination: CompleteState) {
233 if *self != CompleteState::Error {
234 *self = destination
235 }
236 }
237}
238
239pub struct CompleteWriter<W> {
240 inner: Option<W>,
241 append: bool,
242 size: u64,
243 state: CompleteState,
244}
245
246impl<W> CompleteWriter<W> {
247 pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
248 CompleteWriter {
249 inner: Some(inner),
250 append,
251 size: 0,
252 state: CompleteState::Open,
253 }
254 }
255
256 fn check(&self, content_length: u64) -> Result<()> {
257 if self.append || content_length == 0 {
258 return Ok(());
259 }
260
261 match self.size.cmp(&content_length) {
262 Ordering::Equal => Ok(()),
263 Ordering::Less => Err(
264 Error::new(ErrorKind::Unexpected, "writer got too little data")
265 .with_context("expect", content_length)
266 .with_context("actual", self.size),
267 ),
268 Ordering::Greater => Err(
269 Error::new(ErrorKind::Unexpected, "writer got too much data")
270 .with_context("expect", content_length)
271 .with_context("actual", self.size),
272 ),
273 }
274 }
275}
276
277#[cfg(debug_assertions)]
280impl<W> Drop for CompleteWriter<W> {
281 fn drop(&mut self) {
282 if self.state == CompleteState::Written {
283 log::warn!(
284 "writer has not been closed or aborted after successful write operation, must be a bug"
285 )
286 }
287 }
288}
289
290impl<W> oio::Write for CompleteWriter<W>
291where
292 W: oio::Write,
293{
294 async fn write(&mut self, bs: Buffer) -> Result<()> {
295 let w = self.inner.as_mut().ok_or_else(|| {
296 debug_assert_ne!(
297 self.state,
298 CompleteState::Open,
299 "bug: inner is empty, but state is Open"
300 );
301 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
302 })?;
303
304 let len = bs.len();
305 w.write(bs)
306 .await
307 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
308 self.size += len as u64;
309 self.state.transition(CompleteState::Written);
310
311 Ok(())
312 }
313
314 async fn close(&mut self) -> Result<Metadata> {
315 let w = self.inner.as_mut().ok_or_else(|| {
316 debug_assert_ne!(
317 self.state,
318 CompleteState::Open,
319 "bug: inner is empty, but state is Open"
320 );
321 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
322 })?;
323
324 let mut ret = w
327 .close()
328 .await
329 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
330 self.check(ret.content_length())
331 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
332 if ret.content_length() == 0 {
333 ret = ret.with_content_length(self.size);
334 }
335 self.inner = None;
336 self.state.transition(CompleteState::Closed);
337
338 Ok(ret)
339 }
340
341 async fn abort(&mut self) -> Result<()> {
342 let w = self.inner.as_mut().ok_or_else(|| {
343 Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
344 })?;
345
346 w.abort()
347 .await
348 .inspect_err(|_| self.state.transition(CompleteState::Error))?;
349 self.inner = None;
350 self.state.transition(CompleteState::Closed);
351
352 Ok(())
353 }
354}