opendal/layers/
complete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use crate::raw::oio::FlatLister;
24use crate::raw::oio::PrefixLister;
25use crate::raw::*;
26use crate::*;
27
28/// Complete underlying services features so that users can use them in
29/// the same way.
30///
31/// # Notes
32///
33/// CompleteLayer is not a public accessible layer that can be used by
34/// external users. OpenDAL will make sure every accessor will apply this
35/// layer once and only once.
36///
37/// # Internal
38///
39/// So far `CompleteLayer` will do the following things:
40///
41/// ## Stat Completion
42///
43/// Not all services support stat dir natively, but we can simulate it via list.
44///
45/// ## List Completion
46///
47/// There are two styles of list, but not all services support both of
48/// them. CompleteLayer will add those capabilities in a zero cost way.
49///
50/// Underlying services will return [`Capability`] to indicate the
51/// features that returning listers support.
52///
53/// - If support `list_with_recursive`, return directly.
54/// - if not, wrap with [`FlatLister`].
55pub struct CompleteLayer;
56
57impl<A: Access> Layer<A> for CompleteLayer {
58    type LayeredAccess = CompleteAccessor<A>;
59
60    fn layer(&self, inner: A) -> Self::LayeredAccess {
61        let info = inner.info();
62        info.update_full_capability(|mut cap| {
63            if cap.list && cap.write_can_empty {
64                cap.create_dir = true;
65            }
66            cap
67        });
68
69        CompleteAccessor {
70            info,
71            inner: Arc::new(inner),
72        }
73    }
74}
75
76/// Provide complete wrapper for backend.
77pub struct CompleteAccessor<A: Access> {
78    info: Arc<AccessorInfo>,
79    inner: Arc<A>,
80}
81
82impl<A: Access> Debug for CompleteAccessor<A> {
83    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84        self.inner.fmt(f)
85    }
86}
87
88impl<A: Access> CompleteAccessor<A> {
89    async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
90        let capability = self.info.native_capability();
91        if capability.create_dir {
92            return self.inner().create_dir(path, args).await;
93        }
94
95        if capability.write_can_empty && capability.list {
96            let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
97            oio::Write::close(&mut w).await?;
98            return Ok(RpCreateDir::default());
99        }
100
101        self.inner.create_dir(path, args).await
102    }
103
104    async fn complete_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
105        let capability = self.info.native_capability();
106
107        if path == "/" {
108            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
109        }
110
111        // Forward to inner if create_dir is supported.
112        if path.ends_with('/') && capability.create_dir {
113            let meta = self.inner.stat(path, args).await?.into_metadata();
114
115            if meta.is_file() {
116                return Err(Error::new(
117                    ErrorKind::NotFound,
118                    "stat expected a directory, but found a file",
119                ));
120            }
121
122            return Ok(RpStat::new(meta));
123        }
124
125        // Otherwise, we can simulate stat dir via `list`.
126        if path.ends_with('/') && capability.list_with_recursive {
127            let (_, mut l) = self
128                .inner
129                .list(path, OpList::default().with_recursive(true).with_limit(1))
130                .await?;
131
132            return if oio::List::next(&mut l).await?.is_some() {
133                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
134            } else {
135                Err(Error::new(
136                    ErrorKind::NotFound,
137                    "the directory is not found",
138                ))
139            };
140        }
141
142        // Forward to underlying storage directly since we don't know how to handle stat dir.
143        self.inner.stat(path, args).await
144    }
145
146    async fn complete_list(
147        &self,
148        path: &str,
149        args: OpList,
150    ) -> Result<(RpList, CompleteLister<A, A::Lister>)> {
151        let cap = self.info.native_capability();
152
153        let recursive = args.recursive();
154
155        match (recursive, cap.list_with_recursive) {
156            // - If service can list_with_recursive, we can forward list to it directly.
157            (_, true) => {
158                let (rp, p) = self.inner.list(path, args).await?;
159                Ok((rp, CompleteLister::One(p)))
160            }
161            // If recursive is true but service can't list_with_recursive
162            (true, false) => {
163                // Forward path that ends with /
164                if path.ends_with('/') {
165                    let p = FlatLister::new(self.inner.clone(), path);
166                    Ok((RpList::default(), CompleteLister::Two(p)))
167                } else {
168                    let parent = get_parent(path);
169                    let p = FlatLister::new(self.inner.clone(), parent);
170                    let p = PrefixLister::new(p, path);
171                    Ok((RpList::default(), CompleteLister::Four(p)))
172                }
173            }
174            // If recursive and service doesn't support list_with_recursive, we need to handle
175            // list prefix by ourselves.
176            (false, false) => {
177                // Forward path that ends with /
178                if path.ends_with('/') {
179                    let (rp, p) = self.inner.list(path, args).await?;
180                    Ok((rp, CompleteLister::One(p)))
181                } else {
182                    let parent = get_parent(path);
183                    let (rp, p) = self.inner.list(parent, args).await?;
184                    let p = PrefixLister::new(p, path);
185                    Ok((rp, CompleteLister::Three(p)))
186                }
187            }
188        }
189    }
190}
191
192impl<A: Access> LayeredAccess for CompleteAccessor<A> {
193    type Inner = A;
194    type Reader = CompleteReader<A::Reader>;
195    type Writer = CompleteWriter<A::Writer>;
196    type Lister = CompleteLister<A, A::Lister>;
197    type Deleter = A::Deleter;
198
199    fn inner(&self) -> &Self::Inner {
200        &self.inner
201    }
202
203    fn info(&self) -> Arc<AccessorInfo> {
204        self.info.clone()
205    }
206
207    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
208        self.complete_create_dir(path, args).await
209    }
210
211    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
212        let size = args.range().size();
213        self.inner
214            .read(path, args)
215            .await
216            .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
217    }
218
219    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
220        let (rp, w) = self.inner.write(path, args.clone()).await?;
221        let w = CompleteWriter::new(w, args.append());
222        Ok((rp, w))
223    }
224
225    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
226        self.complete_stat(path, args).await
227    }
228
229    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
230        self.inner().delete().await
231    }
232
233    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
234        self.complete_list(path, args).await
235    }
236
237    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
238        self.inner.presign(path, args).await
239    }
240}
241
242pub type CompleteLister<A, P> =
243    FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, PrefixLister<FlatLister<Arc<A>, P>>>;
244
245pub struct CompleteReader<R> {
246    inner: R,
247    size: Option<u64>,
248    read: u64,
249}
250
251impl<R> CompleteReader<R> {
252    pub fn new(inner: R, size: Option<u64>) -> Self {
253        Self {
254            inner,
255            size,
256            read: 0,
257        }
258    }
259
260    pub fn check(&self) -> Result<()> {
261        let Some(size) = self.size else {
262            return Ok(());
263        };
264
265        match self.read.cmp(&size) {
266            Ordering::Equal => Ok(()),
267            Ordering::Less => Err(
268                Error::new(ErrorKind::Unexpected, "reader got too little data")
269                    .with_context("expect", size)
270                    .with_context("actual", self.read),
271            ),
272            Ordering::Greater => Err(
273                Error::new(ErrorKind::Unexpected, "reader got too much data")
274                    .with_context("expect", size)
275                    .with_context("actual", self.read),
276            ),
277        }
278    }
279}
280
281impl<R: oio::Read> oio::Read for CompleteReader<R> {
282    async fn read(&mut self) -> Result<Buffer> {
283        let buf = self.inner.read().await?;
284
285        if buf.is_empty() {
286            self.check()?;
287        } else {
288            self.read += buf.len() as u64;
289        }
290
291        Ok(buf)
292    }
293}
294
295pub struct CompleteWriter<W> {
296    inner: Option<W>,
297    append: bool,
298    size: u64,
299}
300
301impl<W> CompleteWriter<W> {
302    pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
303        CompleteWriter {
304            inner: Some(inner),
305            append,
306            size: 0,
307        }
308    }
309
310    fn check(&self, content_length: u64) -> Result<()> {
311        if self.append || content_length == 0 {
312            return Ok(());
313        }
314
315        match self.size.cmp(&content_length) {
316            Ordering::Equal => Ok(()),
317            Ordering::Less => Err(
318                Error::new(ErrorKind::Unexpected, "writer got too little data")
319                    .with_context("expect", content_length)
320                    .with_context("actual", self.size),
321            ),
322            Ordering::Greater => Err(
323                Error::new(ErrorKind::Unexpected, "writer got too much data")
324                    .with_context("expect", content_length)
325                    .with_context("actual", self.size),
326            ),
327        }
328    }
329}
330
331/// Check if the writer has been closed or aborted while debug_assertions
332/// enabled. This code will never be executed in release mode.
333#[cfg(debug_assertions)]
334impl<W> Drop for CompleteWriter<W> {
335    fn drop(&mut self) {
336        if self.inner.is_some() {
337            log::warn!("writer has not been closed or aborted, must be a bug")
338        }
339    }
340}
341
342impl<W> oio::Write for CompleteWriter<W>
343where
344    W: oio::Write,
345{
346    async fn write(&mut self, bs: Buffer) -> Result<()> {
347        let w = self.inner.as_mut().ok_or_else(|| {
348            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
349        })?;
350
351        let len = bs.len();
352        w.write(bs).await?;
353        self.size += len as u64;
354
355        Ok(())
356    }
357
358    async fn close(&mut self) -> Result<Metadata> {
359        let w = self.inner.as_mut().ok_or_else(|| {
360            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
361        })?;
362
363        // we must return `Err` before setting inner to None; otherwise,
364        // we won't be able to retry `close` in `RetryLayer`.
365        let mut ret = w.close().await?;
366        self.check(ret.content_length())?;
367        if ret.content_length() == 0 {
368            ret = ret.with_content_length(self.size);
369        }
370        self.inner = None;
371
372        Ok(ret)
373    }
374
375    async fn abort(&mut self) -> Result<()> {
376        let w = self.inner.as_mut().ok_or_else(|| {
377            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
378        })?;
379
380        w.abort().await?;
381        self.inner = None;
382
383        Ok(())
384    }
385}