opendal_core/layers/
complete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio;
24use crate::raw::{
25    Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList, OpPresign, OpRead, OpStat,
26    OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
27};
28use crate::*;
29
30/// CompleteLayer keeps validation wrappers for read/write operations.
31pub struct CompleteLayer;
32
33impl<A: Access> Layer<A> for CompleteLayer {
34    type LayeredAccess = CompleteAccessor<A>;
35
36    fn layer(&self, inner: A) -> Self::LayeredAccess {
37        CompleteAccessor {
38            info: inner.info(),
39            inner: Arc::new(inner),
40        }
41    }
42}
43
44/// Provide complete wrapper for backend.
45pub struct CompleteAccessor<A: Access> {
46    info: Arc<AccessorInfo>,
47    inner: Arc<A>,
48}
49
50impl<A: Access> Debug for CompleteAccessor<A> {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        self.inner.fmt(f)
53    }
54}
55
56impl<A: Access> LayeredAccess for CompleteAccessor<A> {
57    type Inner = A;
58    type Reader = CompleteReader<A::Reader>;
59    type Writer = CompleteWriter<A::Writer>;
60    type Lister = A::Lister;
61    type Deleter = A::Deleter;
62
63    fn inner(&self) -> &Self::Inner {
64        &self.inner
65    }
66
67    fn info(&self) -> Arc<AccessorInfo> {
68        self.info.clone()
69    }
70
71    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
72        self.inner().create_dir(path, args).await
73    }
74
75    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
76        let size = args.range().size();
77        self.inner
78            .read(path, args)
79            .await
80            .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
81    }
82
83    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
84        let (rp, w) = self.inner.write(path, args.clone()).await?;
85        let w = CompleteWriter::new(w, args.append());
86        Ok((rp, w))
87    }
88
89    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
90        self.inner.stat(path, args).await
91    }
92
93    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
94        self.inner().delete().await
95    }
96
97    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
98        self.inner.list(path, args).await
99    }
100
101    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
102        self.inner.presign(path, args).await
103    }
104}
105
106pub struct CompleteReader<R> {
107    inner: R,
108    size: Option<u64>,
109    read: u64,
110}
111
112impl<R> CompleteReader<R> {
113    pub fn new(inner: R, size: Option<u64>) -> Self {
114        Self {
115            inner,
116            size,
117            read: 0,
118        }
119    }
120
121    pub fn check(&self) -> Result<()> {
122        let Some(size) = self.size else {
123            return Ok(());
124        };
125
126        match self.read.cmp(&size) {
127            Ordering::Equal => Ok(()),
128            Ordering::Less => Err(
129                Error::new(ErrorKind::Unexpected, "reader got too little data")
130                    .with_context("expect", size)
131                    .with_context("actual", self.read),
132            ),
133            Ordering::Greater => Err(
134                Error::new(ErrorKind::Unexpected, "reader got too much data")
135                    .with_context("expect", size)
136                    .with_context("actual", self.read),
137            ),
138        }
139    }
140}
141
142impl<R: oio::Read> oio::Read for CompleteReader<R> {
143    async fn read(&mut self) -> Result<Buffer> {
144        let buf = self.inner.read().await?;
145
146        if buf.is_empty() {
147            self.check()?;
148        } else {
149            self.read += buf.len() as u64;
150        }
151
152        Ok(buf)
153    }
154}
155
156pub struct CompleteWriter<W> {
157    inner: Option<W>,
158    append: bool,
159    size: u64,
160}
161
162impl<W> CompleteWriter<W> {
163    pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
164        CompleteWriter {
165            inner: Some(inner),
166            append,
167            size: 0,
168        }
169    }
170
171    fn check(&self, content_length: u64) -> Result<()> {
172        if self.append || content_length == 0 {
173            return Ok(());
174        }
175
176        match self.size.cmp(&content_length) {
177            Ordering::Equal => Ok(()),
178            Ordering::Less => Err(
179                Error::new(ErrorKind::Unexpected, "writer got too little data")
180                    .with_context("expect", content_length)
181                    .with_context("actual", self.size),
182            ),
183            Ordering::Greater => Err(
184                Error::new(ErrorKind::Unexpected, "writer got too much data")
185                    .with_context("expect", content_length)
186                    .with_context("actual", self.size),
187            ),
188        }
189    }
190}
191
192/// Check if the writer has been closed or aborted while debug_assertions
193/// enabled. This code will never be executed in release mode.
194#[cfg(debug_assertions)]
195impl<W> Drop for CompleteWriter<W> {
196    fn drop(&mut self) {
197        if self.inner.is_some() {
198            log::warn!("writer has not been closed or aborted, must be a bug")
199        }
200    }
201}
202
203impl<W> oio::Write for CompleteWriter<W>
204where
205    W: oio::Write,
206{
207    async fn write(&mut self, bs: Buffer) -> Result<()> {
208        let w = self.inner.as_mut().ok_or_else(|| {
209            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
210        })?;
211
212        let len = bs.len();
213        w.write(bs).await?;
214        self.size += len as u64;
215
216        Ok(())
217    }
218
219    async fn close(&mut self) -> Result<Metadata> {
220        let w = self.inner.as_mut().ok_or_else(|| {
221            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
222        })?;
223
224        // we must return `Err` before setting inner to None; otherwise,
225        // we won't be able to retry `close` in `RetryLayer`.
226        let mut ret = w.close().await?;
227        self.check(ret.content_length())?;
228        if ret.content_length() == 0 {
229            ret = ret.with_content_length(self.size);
230        }
231        self.inner = None;
232
233        Ok(ret)
234    }
235
236    async fn abort(&mut self) -> Result<()> {
237        let w = self.inner.as_mut().ok_or_else(|| {
238            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
239        })?;
240
241        w.abort().await?;
242        self.inner = None;
243
244        Ok(())
245    }
246}