opendal/services/ipmfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::fmt::Write;
20use std::str;
21use std::sync::Arc;
22
23use bytes::Buf;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use serde::Deserialize;
28
29use super::delete::IpmfsDeleter;
30use super::error::parse_error;
31use super::lister::IpmfsLister;
32use super::writer::IpmfsWriter;
33use crate::raw::*;
34use crate::*;
35
36/// IPFS Mutable File System (IPMFS) backend.
37#[doc = include_str!("docs.md")]
38#[derive(Clone)]
39pub struct IpmfsBackend {
40    info: Arc<AccessorInfo>,
41    root: String,
42    endpoint: String,
43    client: HttpClient,
44}
45
46impl fmt::Debug for IpmfsBackend {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        f.debug_struct("Backend")
49            .field("root", &self.root)
50            .field("endpoint", &self.endpoint)
51            .finish()
52    }
53}
54
55impl IpmfsBackend {
56    pub(crate) fn new(root: String, client: HttpClient, endpoint: String) -> Self {
57        Self {
58            info: {
59                let am = AccessorInfo::default();
60                am.set_scheme(Scheme::Ipmfs)
61                    .set_root(&root)
62                    .set_native_capability(Capability {
63                        stat: true,
64                        stat_has_content_length: true,
65
66                        read: true,
67
68                        write: true,
69                        delete: true,
70
71                        list: true,
72                        list_has_content_length: true,
73
74                        shared: true,
75
76                        ..Default::default()
77                    });
78
79                am.into()
80            },
81            root,
82            client,
83            endpoint,
84        }
85    }
86}
87
88impl Access for IpmfsBackend {
89    type Reader = HttpBody;
90    type Writer = oio::OneShotWriter<IpmfsWriter>;
91    type Lister = oio::PageLister<IpmfsLister>;
92    type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
93    type BlockingReader = ();
94    type BlockingWriter = ();
95    type BlockingLister = ();
96    type BlockingDeleter = ();
97
98    fn info(&self) -> Arc<AccessorInfo> {
99        self.info.clone()
100    }
101
102    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
103        let resp = self.ipmfs_mkdir(path).await?;
104
105        let status = resp.status();
106
107        match status {
108            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
109            _ => Err(parse_error(resp)),
110        }
111    }
112
113    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
114        // Stat root always returns a DIR.
115        if path == "/" {
116            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
117        }
118
119        let resp = self.ipmfs_stat(path).await?;
120
121        let status = resp.status();
122
123        match status {
124            StatusCode::OK => {
125                let bs = resp.into_body();
126
127                let res: IpfsStatResponse =
128                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
129
130                let mode = match res.file_type.as_str() {
131                    "file" => EntryMode::FILE,
132                    "directory" => EntryMode::DIR,
133                    _ => EntryMode::Unknown,
134                };
135
136                let mut meta = Metadata::new(mode);
137                meta.set_content_length(res.size);
138
139                Ok(RpStat::new(meta))
140            }
141            _ => Err(parse_error(resp)),
142        }
143    }
144
145    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
146        let resp = self.ipmfs_read(path, args.range()).await?;
147
148        let status = resp.status();
149
150        match status {
151            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
152                Ok((RpRead::default(), resp.into_body()))
153            }
154            _ => {
155                let (part, mut body) = resp.into_parts();
156                let buf = body.to_buffer().await?;
157                Err(parse_error(Response::from_parts(part, buf)))
158            }
159        }
160    }
161
162    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
163        Ok((
164            RpWrite::default(),
165            oio::OneShotWriter::new(IpmfsWriter::new(self.clone(), path.to_string())),
166        ))
167    }
168
169    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
170        Ok((
171            RpDelete::default(),
172            oio::OneShotDeleter::new(IpmfsDeleter::new(Arc::new(self.clone()))),
173        ))
174    }
175
176    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
177        let l = IpmfsLister::new(Arc::new(self.clone()), &self.root, path);
178        Ok((RpList::default(), oio::PageLister::new(l)))
179    }
180}
181
182impl IpmfsBackend {
183    async fn ipmfs_stat(&self, path: &str) -> Result<Response<Buffer>> {
184        let p = build_rooted_abs_path(&self.root, path);
185
186        let url = format!(
187            "{}/api/v0/files/stat?arg={}",
188            self.endpoint,
189            percent_encode_path(&p)
190        );
191
192        let req = Request::post(url);
193        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
194
195        self.client.send(req).await
196    }
197
198    pub async fn ipmfs_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
199        let p = build_rooted_abs_path(&self.root, path);
200
201        let mut url = format!(
202            "{}/api/v0/files/read?arg={}",
203            self.endpoint,
204            percent_encode_path(&p)
205        );
206
207        write!(url, "&offset={}", range.offset()).expect("write into string must succeed");
208        if let Some(count) = range.size() {
209            write!(url, "&count={count}").expect("write into string must succeed")
210        }
211
212        let req = Request::post(url);
213        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
214
215        self.client.fetch(req).await
216    }
217
218    pub async fn ipmfs_rm(&self, path: &str) -> Result<Response<Buffer>> {
219        let p = build_rooted_abs_path(&self.root, path);
220
221        let url = format!(
222            "{}/api/v0/files/rm?arg={}",
223            self.endpoint,
224            percent_encode_path(&p)
225        );
226
227        let req = Request::post(url);
228        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
229
230        self.client.send(req).await
231    }
232
233    pub(crate) async fn ipmfs_ls(&self, path: &str) -> Result<Response<Buffer>> {
234        let p = build_rooted_abs_path(&self.root, path);
235
236        let url = format!(
237            "{}/api/v0/files/ls?arg={}&long=true",
238            self.endpoint,
239            percent_encode_path(&p)
240        );
241
242        let req = Request::post(url);
243        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
244
245        self.client.send(req).await
246    }
247
248    async fn ipmfs_mkdir(&self, path: &str) -> Result<Response<Buffer>> {
249        let p = build_rooted_abs_path(&self.root, path);
250
251        let url = format!(
252            "{}/api/v0/files/mkdir?arg={}&parents=true",
253            self.endpoint,
254            percent_encode_path(&p)
255        );
256
257        let req = Request::post(url);
258        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
259
260        self.client.send(req).await
261    }
262
263    /// Support write from reader.
264    pub async fn ipmfs_write(&self, path: &str, body: Buffer) -> Result<Response<Buffer>> {
265        let p = build_rooted_abs_path(&self.root, path);
266
267        let url = format!(
268            "{}/api/v0/files/write?arg={}&parents=true&create=true&truncate=true",
269            self.endpoint,
270            percent_encode_path(&p)
271        );
272
273        let multipart = Multipart::new().part(FormDataPart::new("data").content(body));
274
275        let req: http::request::Builder = Request::post(url);
276        let req = multipart.apply(req)?;
277
278        self.client.send(req).await
279    }
280}
281
282#[derive(Deserialize, Default, Debug)]
283#[serde(default)]
284struct IpfsStatResponse {
285    #[serde(rename = "Size")]
286    size: u64,
287    #[serde(rename = "Type")]
288    file_type: String,
289}