dav_server_opendalfs/
file.rs1use std::fmt::{Debug, Formatter};
19use std::io::SeekFrom;
20
21use bytes::{Buf, Bytes, BytesMut};
22use dav_server::fs::{DavFile, OpenOptions};
23use dav_server::fs::{DavMetaData, FsResult};
24use dav_server::fs::{FsError, FsFuture};
25use futures::FutureExt;
26use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
27use opendal::{FuturesAsyncReader, FuturesAsyncWriter, Operator};
28
29use super::metadata::OpendalMetaData;
30use super::utils::*;
31
32pub struct OpendalFile {
34 op: Operator,
35 path: String,
36 state: State,
37 buf: BytesMut,
38}
39
40impl Debug for OpendalFile {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("OpendalFile")
43 .field("path", &self.path)
44 .field(
45 "state",
46 match &self.state {
47 State::Read { .. } => &"read",
48 State::Write(_) => &"write",
49 },
50 )
51 .finish()
52 }
53}
54
55enum State {
56 Read(FuturesAsyncReader),
57 Write(FuturesAsyncWriter),
58}
59
60impl OpendalFile {
61 pub async fn open(op: Operator, path: String, options: OpenOptions) -> FsResult<Self> {
63 let state = if options.read {
64 let r = op
65 .reader(&path)
66 .await
67 .map_err(convert_error)?
68 .into_futures_async_read(..)
69 .await
70 .map_err(convert_error)?;
71 State::Read(r)
72 } else if options.write {
73 let w = op
74 .writer_with(&path)
75 .append(options.append)
76 .await
77 .map_err(convert_error)?
78 .into_futures_async_write();
79 State::Write(w)
80 } else {
81 return Err(FsError::NotImplemented);
82 };
83
84 Ok(Self {
85 op,
86 path,
87 state,
88 buf: BytesMut::new(),
89 })
90 }
91}
92
93impl DavFile for OpendalFile {
94 fn metadata(&mut self) -> FsFuture<Box<dyn DavMetaData>> {
95 async move {
96 self.op
97 .stat(&self.path)
98 .await
99 .map(|opendal_metadata| {
100 Box::new(OpendalMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>
101 })
102 .map_err(convert_error)
103 }
104 .boxed()
105 }
106
107 fn write_buf(&mut self, mut buf: Box<dyn Buf + Send>) -> FsFuture<()> {
108 async move {
109 let State::Write(w) = &mut self.state else {
110 return Err(FsError::GeneralFailure);
111 };
112
113 w.write_all(&buf.copy_to_bytes(buf.remaining()))
114 .await
115 .map_err(|_| FsError::GeneralFailure)?;
116 Ok(())
117 }
118 .boxed()
119 }
120
121 fn write_bytes(&mut self, buf: Bytes) -> FsFuture<()> {
122 async move {
123 let State::Write(w) = &mut self.state else {
124 return Err(FsError::GeneralFailure);
125 };
126
127 w.write_all(&buf).await.map_err(|_| FsError::GeneralFailure)
128 }
129 .boxed()
130 }
131
132 fn read_bytes(&mut self, count: usize) -> FsFuture<Bytes> {
133 async move {
134 let State::Read(r) = &mut self.state else {
135 return Err(FsError::GeneralFailure);
136 };
137
138 self.buf.resize(count, 0);
139 let len = r
140 .read(&mut self.buf)
141 .await
142 .map_err(|_| FsError::GeneralFailure)?;
143 Ok(self.buf.split_to(len).freeze())
144 }
145 .boxed()
146 }
147
148 fn seek(&mut self, pos: SeekFrom) -> FsFuture<u64> {
149 async move {
150 let State::Read(r) = &mut self.state else {
151 return Err(FsError::GeneralFailure);
152 };
153
154 r.seek(pos).await.map_err(|_| FsError::GeneralFailure)
155 }
156 .boxed()
157 }
158
159 fn flush(&mut self) -> FsFuture<()> {
160 async move {
161 let State::Write(w) = &mut self.state else {
162 return Err(FsError::GeneralFailure);
163 };
164
165 w.flush().await.map_err(|_| FsError::GeneralFailure)?;
166 w.close().await.map_err(|_| FsError::GeneralFailure)
167 }
168 .boxed()
169 }
170}