1use std::fmt;
19use std::fmt::Write;
20use std::str;
21use std::sync::Arc;
22
23use bytes::Buf;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use serde::Deserialize;
28
29use super::delete::IpmfsDeleter;
30use super::error::parse_error;
31use super::lister::IpmfsLister;
32use super::writer::IpmfsWriter;
33use crate::raw::*;
34use crate::*;
35
36#[doc = include_str!("docs.md")]
38#[derive(Clone)]
39pub struct IpmfsBackend {
40 info: Arc<AccessorInfo>,
41 root: String,
42 endpoint: String,
43 client: HttpClient,
44}
45
46impl fmt::Debug for IpmfsBackend {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 f.debug_struct("Backend")
49 .field("root", &self.root)
50 .field("endpoint", &self.endpoint)
51 .finish()
52 }
53}
54
55impl IpmfsBackend {
56 pub(crate) fn new(root: String, client: HttpClient, endpoint: String) -> Self {
57 Self {
58 info: {
59 let am = AccessorInfo::default();
60 am.set_scheme(Scheme::Ipmfs)
61 .set_root(&root)
62 .set_native_capability(Capability {
63 stat: true,
64 stat_has_content_length: true,
65
66 read: true,
67
68 write: true,
69 delete: true,
70
71 list: true,
72 list_has_content_length: true,
73
74 shared: true,
75
76 ..Default::default()
77 });
78
79 am.into()
80 },
81 root,
82 client,
83 endpoint,
84 }
85 }
86}
87
88impl Access for IpmfsBackend {
89 type Reader = HttpBody;
90 type Writer = oio::OneShotWriter<IpmfsWriter>;
91 type Lister = oio::PageLister<IpmfsLister>;
92 type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
93 type BlockingReader = ();
94 type BlockingWriter = ();
95 type BlockingLister = ();
96 type BlockingDeleter = ();
97
98 fn info(&self) -> Arc<AccessorInfo> {
99 self.info.clone()
100 }
101
102 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
103 let resp = self.ipmfs_mkdir(path).await?;
104
105 let status = resp.status();
106
107 match status {
108 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
109 _ => Err(parse_error(resp)),
110 }
111 }
112
113 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
114 if path == "/" {
116 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
117 }
118
119 let resp = self.ipmfs_stat(path).await?;
120
121 let status = resp.status();
122
123 match status {
124 StatusCode::OK => {
125 let bs = resp.into_body();
126
127 let res: IpfsStatResponse =
128 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
129
130 let mode = match res.file_type.as_str() {
131 "file" => EntryMode::FILE,
132 "directory" => EntryMode::DIR,
133 _ => EntryMode::Unknown,
134 };
135
136 let mut meta = Metadata::new(mode);
137 meta.set_content_length(res.size);
138
139 Ok(RpStat::new(meta))
140 }
141 _ => Err(parse_error(resp)),
142 }
143 }
144
145 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
146 let resp = self.ipmfs_read(path, args.range()).await?;
147
148 let status = resp.status();
149
150 match status {
151 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
152 Ok((RpRead::default(), resp.into_body()))
153 }
154 _ => {
155 let (part, mut body) = resp.into_parts();
156 let buf = body.to_buffer().await?;
157 Err(parse_error(Response::from_parts(part, buf)))
158 }
159 }
160 }
161
162 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
163 Ok((
164 RpWrite::default(),
165 oio::OneShotWriter::new(IpmfsWriter::new(self.clone(), path.to_string())),
166 ))
167 }
168
169 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
170 Ok((
171 RpDelete::default(),
172 oio::OneShotDeleter::new(IpmfsDeleter::new(Arc::new(self.clone()))),
173 ))
174 }
175
176 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
177 let l = IpmfsLister::new(Arc::new(self.clone()), &self.root, path);
178 Ok((RpList::default(), oio::PageLister::new(l)))
179 }
180}
181
182impl IpmfsBackend {
183 async fn ipmfs_stat(&self, path: &str) -> Result<Response<Buffer>> {
184 let p = build_rooted_abs_path(&self.root, path);
185
186 let url = format!(
187 "{}/api/v0/files/stat?arg={}",
188 self.endpoint,
189 percent_encode_path(&p)
190 );
191
192 let req = Request::post(url);
193 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
194
195 self.client.send(req).await
196 }
197
198 pub async fn ipmfs_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
199 let p = build_rooted_abs_path(&self.root, path);
200
201 let mut url = format!(
202 "{}/api/v0/files/read?arg={}",
203 self.endpoint,
204 percent_encode_path(&p)
205 );
206
207 write!(url, "&offset={}", range.offset()).expect("write into string must succeed");
208 if let Some(count) = range.size() {
209 write!(url, "&count={count}").expect("write into string must succeed")
210 }
211
212 let req = Request::post(url);
213 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
214
215 self.client.fetch(req).await
216 }
217
218 pub async fn ipmfs_rm(&self, path: &str) -> Result<Response<Buffer>> {
219 let p = build_rooted_abs_path(&self.root, path);
220
221 let url = format!(
222 "{}/api/v0/files/rm?arg={}",
223 self.endpoint,
224 percent_encode_path(&p)
225 );
226
227 let req = Request::post(url);
228 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
229
230 self.client.send(req).await
231 }
232
233 pub(crate) async fn ipmfs_ls(&self, path: &str) -> Result<Response<Buffer>> {
234 let p = build_rooted_abs_path(&self.root, path);
235
236 let url = format!(
237 "{}/api/v0/files/ls?arg={}&long=true",
238 self.endpoint,
239 percent_encode_path(&p)
240 );
241
242 let req = Request::post(url);
243 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
244
245 self.client.send(req).await
246 }
247
248 async fn ipmfs_mkdir(&self, path: &str) -> Result<Response<Buffer>> {
249 let p = build_rooted_abs_path(&self.root, path);
250
251 let url = format!(
252 "{}/api/v0/files/mkdir?arg={}&parents=true",
253 self.endpoint,
254 percent_encode_path(&p)
255 );
256
257 let req = Request::post(url);
258 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
259
260 self.client.send(req).await
261 }
262
263 pub async fn ipmfs_write(&self, path: &str, body: Buffer) -> Result<Response<Buffer>> {
265 let p = build_rooted_abs_path(&self.root, path);
266
267 let url = format!(
268 "{}/api/v0/files/write?arg={}&parents=true&create=true&truncate=true",
269 self.endpoint,
270 percent_encode_path(&p)
271 );
272
273 let multipart = Multipart::new().part(FormDataPart::new("data").content(body));
274
275 let req: http::request::Builder = Request::post(url);
276 let req = multipart.apply(req)?;
277
278 self.client.send(req).await
279 }
280}
281
282#[derive(Deserialize, Default, Debug)]
283#[serde(default)]
284struct IpfsStatResponse {
285 #[serde(rename = "Size")]
286 size: u64,
287 #[serde(rename = "Type")]
288 file_type: String,
289}