opendal_core/layers/
complete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio;
24use crate::raw::{
25    Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList, OpPresign, OpRead, OpStat,
26    OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
27};
28use crate::*;
29
30/// CompleteLayer keeps validation wrappers for read/write operations.
31pub struct CompleteLayer;
32
33impl<A: Access> Layer<A> for CompleteLayer {
34    type LayeredAccess = CompleteAccessor<A>;
35
36    fn layer(&self, inner: A) -> Self::LayeredAccess {
37        CompleteAccessor {
38            info: inner.info(),
39            inner: Arc::new(inner),
40        }
41    }
42}
43
44/// Provide complete wrapper for backend.
45pub struct CompleteAccessor<A: Access> {
46    info: Arc<AccessorInfo>,
47    inner: Arc<A>,
48}
49
50impl<A: Access> Debug for CompleteAccessor<A> {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        self.inner.fmt(f)
53    }
54}
55
56impl<A: Access> LayeredAccess for CompleteAccessor<A> {
57    type Inner = A;
58    type Reader = CompleteReader<A::Reader>;
59    type Writer = CompleteWriter<A::Writer>;
60    type Lister = CompleteLister<A>;
61    type Deleter = A::Deleter;
62
63    fn inner(&self) -> &Self::Inner {
64        &self.inner
65    }
66
67    fn info(&self) -> Arc<AccessorInfo> {
68        self.info.clone()
69    }
70
71    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
72        self.inner().create_dir(path, args).await
73    }
74
75    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
76        let size = args.range().size();
77        self.inner
78            .read(path, args)
79            .await
80            .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
81    }
82
83    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
84        let (rp, w) = self.inner.write(path, args.clone()).await?;
85        let w = CompleteWriter::new(w, args.append());
86        Ok((rp, w))
87    }
88
89    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
90        self.inner.stat(path, args).await
91    }
92
93    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
94        self.inner().delete().await
95    }
96
97    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
98        let (rp, lister) = self.inner.list(path, args).await?;
99        let lister = CompleteLister::new(self.inner.clone(), self.info.clone(), lister);
100        Ok((rp, lister))
101    }
102
103    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
104        self.inner.presign(path, args).await
105    }
106}
107
108pub struct CompleteLister<A: Access> {
109    inner: A::Lister,
110    acc: Arc<A>,
111    info: Arc<AccessorInfo>,
112}
113
114impl<A: Access> CompleteLister<A> {
115    fn new(acc: Arc<A>, info: Arc<AccessorInfo>, inner: A::Lister) -> Self {
116        Self { inner, acc, info }
117    }
118
119    async fn ensure_file_content_length(&self, entry: &mut oio::Entry) -> Result<()> {
120        let path = entry.path().to_string();
121        let version = entry.metadata().version().map(str::to_owned);
122        let mut op = OpStat::new();
123        if let Some(version) = version.as_deref() {
124            op = op.with_version(version);
125        }
126
127        let stat_metadata = self.acc.stat(&path, op).await?.into_metadata();
128        if !stat_metadata.has_content_length() {
129            return Err(Error::new(
130                ErrorKind::Unexpected,
131                "content length is required for list file entries",
132            )
133            .with_operation("CompleteLister::ensure_file_content_length")
134            .with_context("service", self.info.scheme().to_string())
135            .with_context("path", path));
136        }
137
138        entry
139            .metadata_mut()
140            .set_content_length(stat_metadata.content_length());
141        Ok(())
142    }
143}
144
145impl<A: Access> oio::List for CompleteLister<A> {
146    async fn next(&mut self) -> Result<Option<oio::Entry>> {
147        loop {
148            let Some(mut entry) = self.inner.next().await? else {
149                return Ok(None);
150            };
151
152            if !entry.mode().is_file()
153                || entry.metadata().is_deleted()
154                || entry.metadata().has_content_length()
155            {
156                return Ok(Some(entry));
157            }
158
159            match self.ensure_file_content_length(&mut entry).await {
160                Ok(()) => return Ok(Some(entry)),
161                Err(err) if err.kind() == ErrorKind::NotFound => continue,
162                Err(err) => return Err(err),
163            }
164        }
165    }
166}
167
168pub struct CompleteReader<R> {
169    inner: R,
170    size: Option<u64>,
171    read: u64,
172}
173
174impl<R> CompleteReader<R> {
175    pub fn new(inner: R, size: Option<u64>) -> Self {
176        Self {
177            inner,
178            size,
179            read: 0,
180        }
181    }
182
183    pub fn check(&self) -> Result<()> {
184        let Some(size) = self.size else {
185            return Ok(());
186        };
187
188        match self.read.cmp(&size) {
189            Ordering::Equal => Ok(()),
190            Ordering::Less => Err(
191                Error::new(ErrorKind::Unexpected, "reader got too little data")
192                    .with_context("expect", size)
193                    .with_context("actual", self.read),
194            ),
195            Ordering::Greater => Err(
196                Error::new(ErrorKind::Unexpected, "reader got too much data")
197                    .with_context("expect", size)
198                    .with_context("actual", self.read),
199            ),
200        }
201    }
202}
203
204impl<R: oio::Read> oio::Read for CompleteReader<R> {
205    async fn read(&mut self) -> Result<Buffer> {
206        let buf = self.inner.read().await?;
207
208        if buf.is_empty() {
209            self.check()?;
210        } else {
211            self.read += buf.len() as u64;
212        }
213
214        Ok(buf)
215    }
216}
217
218/// Tracks the state of the Write operation.
219/// A successful operation goes through states: Open -> Written -> Closed
220/// A failed operation terminates in the Error state
221#[derive(Debug, PartialEq, Eq)]
222enum CompleteState {
223    Open,
224    Written,
225    Closed,
226    Error,
227}
228
229impl CompleteState {
230    /// Attempt to transition to the destination state. Once CompleteState has
231    /// errored all further transitions are ignored.
232    fn transition(&mut self, destination: CompleteState) {
233        if *self != CompleteState::Error {
234            *self = destination
235        }
236    }
237}
238
239pub struct CompleteWriter<W> {
240    inner: Option<W>,
241    append: bool,
242    size: u64,
243    state: CompleteState,
244}
245
246impl<W> CompleteWriter<W> {
247    pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
248        CompleteWriter {
249            inner: Some(inner),
250            append,
251            size: 0,
252            state: CompleteState::Open,
253        }
254    }
255
256    fn check(&self, content_length: u64) -> Result<()> {
257        if self.append || content_length == 0 {
258            return Ok(());
259        }
260
261        match self.size.cmp(&content_length) {
262            Ordering::Equal => Ok(()),
263            Ordering::Less => Err(
264                Error::new(ErrorKind::Unexpected, "writer got too little data")
265                    .with_context("expect", content_length)
266                    .with_context("actual", self.size),
267            ),
268            Ordering::Greater => Err(
269                Error::new(ErrorKind::Unexpected, "writer got too much data")
270                    .with_context("expect", content_length)
271                    .with_context("actual", self.size),
272            ),
273        }
274    }
275}
276
277/// Check if the writer has been closed or aborted while debug_assertions
278/// enabled. This code will never be executed in release mode.
279#[cfg(debug_assertions)]
280impl<W> Drop for CompleteWriter<W> {
281    fn drop(&mut self) {
282        if self.state == CompleteState::Written {
283            log::warn!(
284                "writer has not been closed or aborted after successful write operation, must be a bug"
285            )
286        }
287    }
288}
289
290impl<W> oio::Write for CompleteWriter<W>
291where
292    W: oio::Write,
293{
294    async fn write(&mut self, bs: Buffer) -> Result<()> {
295        let w = self.inner.as_mut().ok_or_else(|| {
296            debug_assert_ne!(
297                self.state,
298                CompleteState::Open,
299                "bug: inner is empty, but state is Open"
300            );
301            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
302        })?;
303
304        let len = bs.len();
305        w.write(bs)
306            .await
307            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
308        self.size += len as u64;
309        self.state.transition(CompleteState::Written);
310
311        Ok(())
312    }
313
314    async fn close(&mut self) -> Result<Metadata> {
315        let w = self.inner.as_mut().ok_or_else(|| {
316            debug_assert_ne!(
317                self.state,
318                CompleteState::Open,
319                "bug: inner is empty, but state is Open"
320            );
321            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
322        })?;
323
324        // we must return `Err` before setting inner to None; otherwise,
325        // we won't be able to retry `close` in `RetryLayer`.
326        let mut ret = w
327            .close()
328            .await
329            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
330        self.check(ret.content_length())
331            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
332        if ret.content_length() == 0 {
333            ret = ret.with_content_length(self.size);
334        }
335        self.inner = None;
336        self.state.transition(CompleteState::Closed);
337
338        Ok(ret)
339    }
340
341    async fn abort(&mut self) -> Result<()> {
342        let w = self.inner.as_mut().ok_or_else(|| {
343            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
344        })?;
345
346        w.abort()
347            .await
348            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
349        self.inner = None;
350        self.state.transition(CompleteState::Closed);
351
352        Ok(())
353    }
354}