dav_server_opendalfs/
file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{Debug, Formatter};
19use std::io::SeekFrom;
20
21use bytes::{Buf, Bytes, BytesMut};
22use dav_server::fs::{DavFile, OpenOptions};
23use dav_server::fs::{DavMetaData, FsResult};
24use dav_server::fs::{FsError, FsFuture};
25use futures::FutureExt;
26use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
27use opendal::{FuturesAsyncReader, FuturesAsyncWriter, Operator};
28
29use super::metadata::OpendalMetaData;
30use super::utils::*;
31
32/// OpendalFile is a `DavFile` implementation for opendal.
33pub struct OpendalFile {
34    op: Operator,
35    path: String,
36    state: State,
37    buf: BytesMut,
38}
39
40impl Debug for OpendalFile {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("OpendalFile")
43            .field("path", &self.path)
44            .field(
45                "state",
46                match &self.state {
47                    State::Read { .. } => &"read",
48                    State::Write(_) => &"write",
49                },
50            )
51            .finish()
52    }
53}
54
55enum State {
56    Read(FuturesAsyncReader),
57    Write(FuturesAsyncWriter),
58}
59
60impl OpendalFile {
61    /// Create a new opendal file.
62    pub async fn open(op: Operator, path: String, options: OpenOptions) -> FsResult<Self> {
63        let state = if options.read {
64            let r = op
65                .reader(&path)
66                .await
67                .map_err(convert_error)?
68                .into_futures_async_read(..)
69                .await
70                .map_err(convert_error)?;
71            State::Read(r)
72        } else if options.write {
73            let w = op
74                .writer_with(&path)
75                .append(options.append)
76                .await
77                .map_err(convert_error)?
78                .into_futures_async_write();
79            State::Write(w)
80        } else {
81            return Err(FsError::NotImplemented);
82        };
83
84        Ok(Self {
85            op,
86            path,
87            state,
88            buf: BytesMut::new(),
89        })
90    }
91}
92
93impl DavFile for OpendalFile {
94    fn metadata(&mut self) -> FsFuture<Box<dyn DavMetaData>> {
95        async move {
96            self.op
97                .stat(&self.path)
98                .await
99                .map(|opendal_metadata| {
100                    Box::new(OpendalMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>
101                })
102                .map_err(convert_error)
103        }
104        .boxed()
105    }
106
107    fn write_buf(&mut self, mut buf: Box<dyn Buf + Send>) -> FsFuture<()> {
108        async move {
109            let State::Write(w) = &mut self.state else {
110                return Err(FsError::GeneralFailure);
111            };
112
113            w.write_all(&buf.copy_to_bytes(buf.remaining()))
114                .await
115                .map_err(|_| FsError::GeneralFailure)?;
116            Ok(())
117        }
118        .boxed()
119    }
120
121    fn write_bytes(&mut self, buf: Bytes) -> FsFuture<()> {
122        async move {
123            let State::Write(w) = &mut self.state else {
124                return Err(FsError::GeneralFailure);
125            };
126
127            w.write_all(&buf).await.map_err(|_| FsError::GeneralFailure)
128        }
129        .boxed()
130    }
131
132    fn read_bytes(&mut self, count: usize) -> FsFuture<Bytes> {
133        async move {
134            let State::Read(r) = &mut self.state else {
135                return Err(FsError::GeneralFailure);
136            };
137
138            self.buf.resize(count, 0);
139            let len = r
140                .read(&mut self.buf)
141                .await
142                .map_err(|_| FsError::GeneralFailure)?;
143            Ok(self.buf.split_to(len).freeze())
144        }
145        .boxed()
146    }
147
148    fn seek(&mut self, pos: SeekFrom) -> FsFuture<u64> {
149        async move {
150            let State::Read(r) = &mut self.state else {
151                return Err(FsError::GeneralFailure);
152            };
153
154            r.seek(pos).await.map_err(|_| FsError::GeneralFailure)
155        }
156        .boxed()
157    }
158
159    fn flush(&mut self) -> FsFuture<()> {
160        async move {
161            let State::Write(w) = &mut self.state else {
162                return Err(FsError::GeneralFailure);
163            };
164
165            w.flush().await.map_err(|_| FsError::GeneralFailure)?;
166            w.close().await.map_err(|_| FsError::GeneralFailure)
167        }
168        .boxed()
169    }
170}