opendal/layers/
complete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::raw::oio::FlatLister;
19use crate::raw::oio::PrefixLister;
20use crate::raw::*;
21use crate::*;
22use std::cmp::Ordering;
23use std::fmt::Debug;
24use std::fmt::Formatter;
25use std::sync::Arc;
26
27/// Complete underlying services features so that users can use them in
28/// the same way.
29///
30/// # Notes
31///
32/// CompleteLayer is not a public accessible layer that can be used by
33/// external users. OpenDAL will make sure every accessor will apply this
34/// layer once and only once.
35///
36/// # Internal
37///
38/// So far `CompleteLayer` will do the following things:
39///
40/// ## Stat Completion
41///
42/// Not all services support stat dir natively, but we can simulate it via list.
43///
44/// ## Read Completion
45///
46/// OpenDAL requires all reader implements [`oio::Read`] and
47/// [`oio::BlockingRead`]. However, not all services have the
48/// capabilities. CompleteLayer will add those capabilities in
49/// a zero cost way.
50///
51/// Underlying services will return [`AccessorInfo`] to indicate the
52/// features that returning readers support.
53///
54/// - If both `seekable` and `streamable`, return directly.
55/// - If not `streamable`, with [`oio::into_read_from_stream`].
56/// - If not `seekable`, with [`oio::into_seekable_read_by_range`]
57/// - If neither not supported, wrap both by_range and into_streamable.
58///
59/// All implementations of Reader should be `zero cost`. In our cases,
60/// which means others must pay the same cost for the same feature provide
61/// by us.
62///
63/// For examples, call `read` without `seek` should always act the same as
64/// calling `read` on plain reader.
65///
66/// ### Read is Seekable
67///
68/// We use [`Capability`] to decide the most suitable implementations.
69///
70/// If [`Capability`] `read_can_seek` is true, we will open it with given args
71/// directly. Otherwise, we will pick a seekable reader implementation based
72/// on input range for it.
73///
74/// - `Some(offset), Some(size)` => `RangeReader`
75/// - `Some(offset), None` and `None, None` => `OffsetReader`
76/// - `None, Some(size)` => get the total size first to convert as `RangeReader`
77///
78/// No matter which reader we use, we will make sure the `read` operation
79/// is zero cost.
80///
81/// ### Read is Streamable
82///
83/// We use internal `AccessorHint::ReadStreamable` to decide the most
84/// suitable implementations.
85///
86/// If [`Capability`] `read_can_next` is true, we will use existing reader
87/// directly. Otherwise, we will use transform this reader as a stream.
88///
89/// ## List Completion
90///
91/// There are two styles of list, but not all services support both of
92/// them. CompleteLayer will add those capabilities in a zero cost way.
93///
94/// Underlying services will return [`Capability`] to indicate the
95/// features that returning listers support.
96///
97/// - If support `list_with_recursive`, return directly.
98/// - if not, wrap with [`FlatLister`].
99///
100pub struct CompleteLayer;
101
102impl<A: Access> Layer<A> for CompleteLayer {
103    type LayeredAccess = CompleteAccessor<A>;
104
105    fn layer(&self, inner: A) -> Self::LayeredAccess {
106        let info = inner.info();
107        info.update_full_capability(|mut cap| {
108            if cap.list && cap.write_can_empty {
109                cap.create_dir = true;
110            }
111            cap
112        });
113
114        CompleteAccessor {
115            info,
116            inner: Arc::new(inner),
117        }
118    }
119}
120
121/// Provide complete wrapper for backend.
122pub struct CompleteAccessor<A: Access> {
123    info: Arc<AccessorInfo>,
124    inner: Arc<A>,
125}
126
127impl<A: Access> Debug for CompleteAccessor<A> {
128    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129        self.inner.fmt(f)
130    }
131}
132
133impl<A: Access> CompleteAccessor<A> {
134    async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
135        let capability = self.info.native_capability();
136        if capability.create_dir {
137            return self.inner().create_dir(path, args).await;
138        }
139
140        if capability.write_can_empty && capability.list {
141            let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
142            oio::Write::close(&mut w).await?;
143            return Ok(RpCreateDir::default());
144        }
145
146        self.inner.create_dir(path, args).await
147    }
148
149    fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
150        let capability = self.info.native_capability();
151        if capability.create_dir && capability.blocking {
152            return self.inner().blocking_create_dir(path, args);
153        }
154
155        if capability.write_can_empty && capability.list && capability.blocking {
156            let (_, mut w) = self.inner.blocking_write(path, OpWrite::default())?;
157            oio::BlockingWrite::close(&mut w)?;
158            return Ok(RpCreateDir::default());
159        }
160
161        self.inner.blocking_create_dir(path, args)
162    }
163
164    async fn complete_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
165        let capability = self.info.native_capability();
166
167        if path == "/" {
168            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
169        }
170
171        // Forward to inner if create_dir is supported.
172        if path.ends_with('/') && capability.create_dir {
173            let meta = self.inner.stat(path, args).await?.into_metadata();
174
175            if meta.is_file() {
176                return Err(Error::new(
177                    ErrorKind::NotFound,
178                    "stat expected a directory, but found a file",
179                ));
180            }
181
182            return Ok(RpStat::new(meta));
183        }
184
185        // Otherwise, we can simulate stat dir via `list`.
186        if path.ends_with('/') && capability.list_with_recursive {
187            let (_, mut l) = self
188                .inner
189                .list(path, OpList::default().with_recursive(true).with_limit(1))
190                .await?;
191
192            return if oio::List::next(&mut l).await?.is_some() {
193                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
194            } else {
195                Err(Error::new(
196                    ErrorKind::NotFound,
197                    "the directory is not found",
198                ))
199            };
200        }
201
202        // Forward to underlying storage directly since we don't know how to handle stat dir.
203        self.inner.stat(path, args).await
204    }
205
206    fn complete_blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
207        let capability = self.info.native_capability();
208
209        if path == "/" {
210            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
211        }
212
213        // Forward to inner if create dir is supported.
214        if path.ends_with('/') && capability.create_dir {
215            let meta = self.inner.blocking_stat(path, args)?.into_metadata();
216
217            if meta.is_file() {
218                return Err(Error::new(
219                    ErrorKind::NotFound,
220                    "stat expected a directory, but found a file",
221                ));
222            }
223
224            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
225        }
226
227        // Otherwise, we can simulate stat a dir path via `list`.
228        if path.ends_with('/') && capability.list_with_recursive {
229            let (_, mut l) = self
230                .inner
231                .blocking_list(path, OpList::default().with_recursive(true).with_limit(1))?;
232
233            return if oio::BlockingList::next(&mut l)?.is_some() {
234                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
235            } else {
236                Err(Error::new(
237                    ErrorKind::NotFound,
238                    "the directory is not found",
239                ))
240            };
241        }
242
243        // Forward to underlying storage directly since we don't know how to handle stat dir.
244        self.inner.blocking_stat(path, args)
245    }
246
247    async fn complete_list(
248        &self,
249        path: &str,
250        args: OpList,
251    ) -> Result<(RpList, CompleteLister<A, A::Lister>)> {
252        let cap = self.info.native_capability();
253
254        let recursive = args.recursive();
255
256        match (recursive, cap.list_with_recursive) {
257            // - If service can list_with_recursive, we can forward list to it directly.
258            (_, true) => {
259                let (rp, p) = self.inner.list(path, args).await?;
260                Ok((rp, CompleteLister::One(p)))
261            }
262            // If recursive is true but service can't list_with_recursive
263            (true, false) => {
264                // Forward path that ends with /
265                if path.ends_with('/') {
266                    let p = FlatLister::new(self.inner.clone(), path);
267                    Ok((RpList::default(), CompleteLister::Two(p)))
268                } else {
269                    let parent = get_parent(path);
270                    let p = FlatLister::new(self.inner.clone(), parent);
271                    let p = PrefixLister::new(p, path);
272                    Ok((RpList::default(), CompleteLister::Four(p)))
273                }
274            }
275            // If recursive and service doesn't support list_with_recursive, we need to handle
276            // list prefix by ourselves.
277            (false, false) => {
278                // Forward path that ends with /
279                if path.ends_with('/') {
280                    let (rp, p) = self.inner.list(path, args).await?;
281                    Ok((rp, CompleteLister::One(p)))
282                } else {
283                    let parent = get_parent(path);
284                    let (rp, p) = self.inner.list(parent, args).await?;
285                    let p = PrefixLister::new(p, path);
286                    Ok((rp, CompleteLister::Three(p)))
287                }
288            }
289        }
290    }
291
292    fn complete_blocking_list(
293        &self,
294        path: &str,
295        args: OpList,
296    ) -> Result<(RpList, CompleteLister<A, A::BlockingLister>)> {
297        let cap = self.info.native_capability();
298
299        let recursive = args.recursive();
300
301        match (recursive, cap.list_with_recursive) {
302            // - If service can list_with_recursive, we can forward list to it directly.
303            (_, true) => {
304                let (rp, p) = self.inner.blocking_list(path, args)?;
305                Ok((rp, CompleteLister::One(p)))
306            }
307            // If recursive is true but service can't list_with_recursive
308            (true, false) => {
309                // Forward path that ends with /
310                if path.ends_with('/') {
311                    let p = FlatLister::new(self.inner.clone(), path);
312                    Ok((RpList::default(), CompleteLister::Two(p)))
313                } else {
314                    let parent = get_parent(path);
315                    let p = FlatLister::new(self.inner.clone(), parent);
316                    let p = PrefixLister::new(p, path);
317                    Ok((RpList::default(), CompleteLister::Four(p)))
318                }
319            }
320            // If recursive and service doesn't support list_with_recursive, we need to handle
321            // list prefix by ourselves.
322            (false, false) => {
323                // Forward path that ends with /
324                if path.ends_with('/') {
325                    let (rp, p) = self.inner.blocking_list(path, args)?;
326                    Ok((rp, CompleteLister::One(p)))
327                } else {
328                    let parent = get_parent(path);
329                    let (rp, p) = self.inner.blocking_list(parent, args)?;
330                    let p = PrefixLister::new(p, path);
331                    Ok((rp, CompleteLister::Three(p)))
332                }
333            }
334        }
335    }
336}
337
338impl<A: Access> LayeredAccess for CompleteAccessor<A> {
339    type Inner = A;
340    type Reader = CompleteReader<A::Reader>;
341    type BlockingReader = CompleteReader<A::BlockingReader>;
342    type Writer = CompleteWriter<A::Writer>;
343    type BlockingWriter = CompleteWriter<A::BlockingWriter>;
344    type Lister = CompleteLister<A, A::Lister>;
345    type BlockingLister = CompleteLister<A, A::BlockingLister>;
346    type Deleter = A::Deleter;
347    type BlockingDeleter = A::BlockingDeleter;
348
349    fn inner(&self) -> &Self::Inner {
350        &self.inner
351    }
352
353    fn info(&self) -> Arc<AccessorInfo> {
354        self.info.clone()
355    }
356
357    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
358        self.complete_create_dir(path, args).await
359    }
360
361    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
362        let size = args.range().size();
363        self.inner
364            .read(path, args)
365            .await
366            .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
367    }
368
369    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
370        let (rp, w) = self.inner.write(path, args.clone()).await?;
371        let w = CompleteWriter::new(w, args.append());
372        Ok((rp, w))
373    }
374
375    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
376        self.complete_stat(path, args).await
377    }
378
379    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
380        self.inner().delete().await
381    }
382
383    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
384        self.complete_list(path, args).await
385    }
386
387    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
388        self.inner.presign(path, args).await
389    }
390
391    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
392        self.complete_blocking_create_dir(path, args)
393    }
394
395    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
396        let size = args.range().size();
397        self.inner
398            .blocking_read(path, args)
399            .map(|(rp, r)| (rp, CompleteReader::new(r, size)))
400    }
401
402    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
403        let append = args.append();
404        self.inner
405            .blocking_write(path, args)
406            .map(|(rp, w)| (rp, CompleteWriter::new(w, append)))
407    }
408
409    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
410        self.complete_blocking_stat(path, args)
411    }
412
413    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
414        self.inner().blocking_delete()
415    }
416
417    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
418        self.complete_blocking_list(path, args)
419    }
420}
421
422pub type CompleteLister<A, P> =
423    FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, PrefixLister<FlatLister<Arc<A>, P>>>;
424
425pub struct CompleteReader<R> {
426    inner: R,
427    size: Option<u64>,
428    read: u64,
429}
430
431impl<R> CompleteReader<R> {
432    pub fn new(inner: R, size: Option<u64>) -> Self {
433        Self {
434            inner,
435            size,
436            read: 0,
437        }
438    }
439
440    pub fn check(&self) -> Result<()> {
441        let Some(size) = self.size else {
442            return Ok(());
443        };
444
445        match self.read.cmp(&size) {
446            Ordering::Equal => Ok(()),
447            Ordering::Less => Err(
448                Error::new(ErrorKind::Unexpected, "reader got too little data")
449                    .with_context("expect", size)
450                    .with_context("actual", self.read),
451            ),
452            Ordering::Greater => Err(
453                Error::new(ErrorKind::Unexpected, "reader got too much data")
454                    .with_context("expect", size)
455                    .with_context("actual", self.read),
456            ),
457        }
458    }
459}
460
461impl<R: oio::Read> oio::Read for CompleteReader<R> {
462    async fn read(&mut self) -> Result<Buffer> {
463        let buf = self.inner.read().await?;
464
465        if buf.is_empty() {
466            self.check()?;
467        } else {
468            self.read += buf.len() as u64;
469        }
470
471        Ok(buf)
472    }
473}
474
475impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
476    fn read(&mut self) -> Result<Buffer> {
477        let buf = self.inner.read()?;
478
479        if buf.is_empty() {
480            self.check()?;
481        } else {
482            self.read += buf.len() as u64;
483        }
484
485        Ok(buf)
486    }
487}
488
489pub struct CompleteWriter<W> {
490    inner: Option<W>,
491    append: bool,
492    size: u64,
493}
494
495impl<W> CompleteWriter<W> {
496    pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
497        CompleteWriter {
498            inner: Some(inner),
499            append,
500            size: 0,
501        }
502    }
503
504    fn check(&self, content_length: u64) -> Result<()> {
505        if self.append || content_length == 0 {
506            return Ok(());
507        }
508
509        match self.size.cmp(&content_length) {
510            Ordering::Equal => Ok(()),
511            Ordering::Less => Err(
512                Error::new(ErrorKind::Unexpected, "writer got too little data")
513                    .with_context("expect", content_length)
514                    .with_context("actual", self.size),
515            ),
516            Ordering::Greater => Err(
517                Error::new(ErrorKind::Unexpected, "writer got too much data")
518                    .with_context("expect", content_length)
519                    .with_context("actual", self.size),
520            ),
521        }
522    }
523}
524
525/// Check if the writer has been closed or aborted while debug_assertions
526/// enabled. This code will never be executed in release mode.
527#[cfg(debug_assertions)]
528impl<W> Drop for CompleteWriter<W> {
529    fn drop(&mut self) {
530        if self.inner.is_some() {
531            log::warn!("writer has not been closed or aborted, must be a bug")
532        }
533    }
534}
535
536impl<W> oio::Write for CompleteWriter<W>
537where
538    W: oio::Write,
539{
540    async fn write(&mut self, bs: Buffer) -> Result<()> {
541        let w = self.inner.as_mut().ok_or_else(|| {
542            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
543        })?;
544
545        let len = bs.len();
546        w.write(bs).await?;
547        self.size += len as u64;
548
549        Ok(())
550    }
551
552    async fn close(&mut self) -> Result<Metadata> {
553        let w = self.inner.as_mut().ok_or_else(|| {
554            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
555        })?;
556
557        // we must return `Err` before setting inner to None; otherwise,
558        // we won't be able to retry `close` in `RetryLayer`.
559        let mut ret = w.close().await?;
560        self.check(ret.content_length())?;
561        if ret.content_length() == 0 {
562            ret = ret.with_content_length(self.size);
563        }
564        self.inner = None;
565
566        Ok(ret)
567    }
568
569    async fn abort(&mut self) -> Result<()> {
570        let w = self.inner.as_mut().ok_or_else(|| {
571            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
572        })?;
573
574        w.abort().await?;
575        self.inner = None;
576
577        Ok(())
578    }
579}
580
581impl<W> oio::BlockingWrite for CompleteWriter<W>
582where
583    W: oio::BlockingWrite,
584{
585    fn write(&mut self, bs: Buffer) -> Result<()> {
586        let w = self.inner.as_mut().ok_or_else(|| {
587            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
588        })?;
589
590        let len = bs.len();
591        w.write(bs)?;
592        self.size += len as u64;
593
594        Ok(())
595    }
596
597    fn close(&mut self) -> Result<Metadata> {
598        let w = self.inner.as_mut().ok_or_else(|| {
599            Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
600        })?;
601
602        let mut ret = w.close()?;
603        self.check(ret.content_length())?;
604        if ret.content_length() == 0 {
605            ret = ret.with_content_length(self.size);
606        }
607        self.inner = None;
608
609        Ok(ret)
610    }
611}