dav_server_opendalfs/
dir.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::metadata::OpendalMetaData;
19use super::utils::*;
20use dav_server::fs::{DavDirEntry, DavMetaData, FsResult};
21use futures::StreamExt;
22use futures::{FutureExt, Stream};
23use opendal::raw::normalize_path;
24use opendal::Operator;
25use opendal::{Entry, Lister};
26use std::pin::Pin;
27use std::task::Poll::Ready;
28use std::task::{ready, Context, Poll};
29
30/// OpendalStream is a stream of `DavDirEntry` that is used to list the contents of a directory.
31pub struct OpendalStream {
32    op: Operator,
33    lister: Lister,
34    path: String,
35}
36
37impl OpendalStream {
38    /// Create a new opendal stream.
39    pub fn new(op: Operator, lister: Lister, p: &str) -> Self {
40        OpendalStream {
41            op,
42            lister,
43            path: normalize_path(p),
44        }
45    }
46}
47
48impl Stream for OpendalStream {
49    type Item = FsResult<Box<dyn DavDirEntry>>;
50
51    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52        let dav_stream = self.get_mut();
53        loop {
54            match ready!(dav_stream.lister.poll_next_unpin(cx)) {
55                Some(entry) => {
56                    let entry = entry.map_err(convert_error)?;
57                    if entry.path() == dav_stream.path {
58                        continue;
59                    }
60                    let webdav_entry = OpendalDirEntry::new(dav_stream.op.clone(), entry);
61                    return Ready(Some(Ok(Box::new(webdav_entry) as Box<dyn DavDirEntry>)));
62                }
63                None => return Ready(None),
64            }
65        }
66    }
67}
68
69/// OpendalDirEntry is a `DavDirEntry` implementation for opendal.
70pub struct OpendalDirEntry {
71    op: Operator,
72    dir_entry: Entry,
73}
74
75impl OpendalDirEntry {
76    /// Create a new opendal dir entry.
77    pub fn new(op: Operator, dir_entry: Entry) -> Self {
78        OpendalDirEntry { dir_entry, op }
79    }
80}
81
82impl DavDirEntry for OpendalDirEntry {
83    fn name(&self) -> Vec<u8> {
84        self.dir_entry.name().as_bytes().to_vec()
85    }
86
87    fn metadata(&self) -> dav_server::fs::FsFuture<Box<dyn DavMetaData>> {
88        async move {
89            self.op
90                .stat(self.dir_entry.path())
91                .await
92                .map(|metadata| Box::new(OpendalMetaData::new(metadata)) as Box<dyn DavMetaData>)
93                .map_err(convert_error)
94        }
95        .boxed()
96    }
97}