opendal/services/ipfs/backend.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use prost::Message;
27
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35 type Builder = IpfsBuilder;
36 fn into_builder(self) -> Self::Builder {
37 IpfsBuilder {
38 config: self,
39 http_client: None,
40 }
41 }
42}
43
44/// IPFS file system support based on [IPFS HTTP Gateway](https://docs.ipfs.tech/concepts/ipfs-gateway/).
45#[doc = include_str!("docs.md")]
46#[derive(Default, Clone, Debug)]
47pub struct IpfsBuilder {
48 config: IpfsConfig,
49 http_client: Option<HttpClient>,
50}
51
52impl IpfsBuilder {
53 /// Set root of ipfs backend.
54 ///
55 /// Root must be a valid ipfs address like the following:
56 ///
57 /// - `/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/` (IPFS with CID v0)
58 /// - `/ipfs/bafybeibozpulxtpv5nhfa2ue3dcjx23ndh3gwr5vwllk7ptoyfwnfjjr4q/` (IPFS with CID v1)
59 /// - `/ipns/opendal.apache.org/` (IPNS)
60 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 /// Set endpoint if ipfs backend.
71 ///
72 /// Endpoint must be a valid ipfs gateway which passed the [IPFS Gateway Checker](https://ipfs.github.io/public-gateway-checker/)
73 ///
74 /// Popular choices including:
75 ///
76 /// - `https://ipfs.io`
77 /// - `https://w3s.link`
78 /// - `https://dweb.link`
79 /// - `https://cloudflare-ipfs.com`
80 /// - `http://127.0.0.1:8080` (ipfs daemon in local)
81 pub fn endpoint(mut self, endpoint: &str) -> Self {
82 if !endpoint.is_empty() {
83 // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
84 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
85 }
86
87 self
88 }
89
90 /// Specify the http client that used by this service.
91 ///
92 /// # Notes
93 ///
94 /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
95 /// during minor updates.
96 pub fn http_client(mut self, client: HttpClient) -> Self {
97 self.http_client = Some(client);
98 self
99 }
100}
101
102impl Builder for IpfsBuilder {
103 const SCHEME: Scheme = Scheme::Ipfs;
104 type Config = IpfsConfig;
105
106 fn build(self) -> Result<impl Access> {
107 debug!("backend build started: {:?}", &self);
108
109 let root = normalize_root(&self.config.root.unwrap_or_default());
110 if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
111 return Err(Error::new(
112 ErrorKind::ConfigInvalid,
113 "root must start with /ipfs/ or /ipns/",
114 )
115 .with_context("service", Scheme::Ipfs)
116 .with_context("root", &root));
117 }
118 debug!("backend use root {}", root);
119
120 let endpoint = match &self.config.endpoint {
121 Some(endpoint) => Ok(endpoint.clone()),
122 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
123 .with_context("service", Scheme::Ipfs)
124 .with_context("root", &root)),
125 }?;
126 debug!("backend use endpoint {}", &endpoint);
127
128 let client = if let Some(client) = self.http_client {
129 client
130 } else {
131 HttpClient::new().map_err(|err| {
132 err.with_operation("Builder::build")
133 .with_context("service", Scheme::Ipfs)
134 })?
135 };
136
137 Ok(IpfsBackend {
138 info: {
139 let ma = AccessorInfo::default();
140 ma.set_scheme(Scheme::Ipfs)
141 .set_root(&root)
142 .set_native_capability(Capability {
143 stat: true,
144 stat_has_content_length: true,
145 stat_has_content_type: true,
146 stat_has_etag: true,
147 stat_has_content_disposition: true,
148
149 read: true,
150
151 list: true,
152
153 shared: true,
154
155 ..Default::default()
156 });
157
158 ma.into()
159 },
160 root,
161 endpoint,
162 client,
163 })
164 }
165}
166
167/// Backend for IPFS.
168#[derive(Clone)]
169pub struct IpfsBackend {
170 info: Arc<AccessorInfo>,
171 endpoint: String,
172 root: String,
173 client: HttpClient,
174}
175
176impl Debug for IpfsBackend {
177 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178 f.debug_struct("Backend")
179 .field("endpoint", &self.endpoint)
180 .field("root", &self.root)
181 .field("client", &self.client)
182 .finish()
183 }
184}
185
186impl Access for IpfsBackend {
187 type Reader = HttpBody;
188 type Writer = ();
189 type Lister = oio::PageLister<DirStream>;
190 type Deleter = ();
191 type BlockingReader = ();
192 type BlockingWriter = ();
193 type BlockingLister = ();
194 type BlockingDeleter = ();
195
196 fn info(&self) -> Arc<AccessorInfo> {
197 self.info.clone()
198 }
199
200 /// IPFS's stat behavior highly depends on its implementation.
201 ///
202 /// Based on IPFS [Path Gateway Specification](https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md),
203 /// response payload could be:
204 ///
205 /// > - UnixFS (implicit default)
206 /// > - File
207 /// > - Bytes representing file contents
208 /// > - Directory
209 /// > - Generated HTML with directory index
210 /// > - When `index.html` is present, gateway can skip generating directory index and return it instead
211 /// > - Raw block (not this case)
212 /// > - CAR (not this case)
213 ///
214 /// When we HEAD a given path, we could have the following responses:
215 ///
216 /// - File
217 ///
218 /// ```http
219 /// :) curl -I https://ipfs.io/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_file
220 /// HTTP/1.1 200 Connection established
221 ///
222 /// HTTP/2 200
223 /// server: openresty
224 /// date: Thu, 08 Sep 2022 00:48:50 GMT
225 /// content-type: application/octet-stream
226 /// content-length: 262144
227 /// access-control-allow-methods: GET
228 /// cache-control: public, max-age=29030400, immutable
229 /// etag: "QmdP6teFTLSNVhT4W5jkhEuUBsjQ3xkp1GmRvDU6937Me1"
230 /// x-ipfs-gateway-host: ipfs-bank11-fr2
231 /// x-ipfs-path: /ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_file
232 /// x-ipfs-roots: QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ,QmdP6teFTLSNVhT4W5jkhEuUBsjQ3xkp1GmRvDU6937Me1
233 /// x-ipfs-pop: ipfs-bank11-fr2
234 /// timing-allow-origin: *
235 /// x-ipfs-datasize: 262144
236 /// access-control-allow-origin: *
237 /// access-control-allow-methods: GET, POST, OPTIONS
238 /// access-control-allow-headers: X-Requested-With, Range, Content-Range, X-Chunked-Output, X-Stream-Output
239 /// access-control-expose-headers: Content-Range, X-Chunked-Output, X-Stream-Output
240 /// x-ipfs-lb-pop: gateway-bank1-fr2
241 /// strict-transport-security: max-age=31536000; includeSubDomains; preload
242 /// x-proxy-cache: MISS
243 /// accept-ranges: bytes
244 /// ```
245 ///
246 /// - Dir with generated index
247 ///
248 /// ```http
249 /// :( curl -I https://ipfs.io/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_dir
250 /// HTTP/1.1 200 Connection established
251 ///
252 /// HTTP/2 200
253 /// server: openresty
254 /// date: Wed, 07 Sep 2022 08:46:13 GMT
255 /// content-type: text/html
256 /// vary: Accept-Encoding
257 /// access-control-allow-methods: GET
258 /// etag: "DirIndex-2b567f6r5vvdg_CID-QmY44DyCDymRN1Qy7sGbupz1ysMkXTWomAQku5vBg7fRQW"
259 /// x-ipfs-gateway-host: ipfs-bank6-sg1
260 /// x-ipfs-path: /ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_dir
261 /// x-ipfs-roots: QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ,QmY44DyCDymRN1Qy7sGbupz1ysMkXTWomAQku5vBg7fRQW
262 /// x-ipfs-pop: ipfs-bank6-sg1
263 /// timing-allow-origin: *
264 /// access-control-allow-origin: *
265 /// access-control-allow-methods: GET, POST, OPTIONS
266 /// access-control-allow-headers: X-Requested-With, Range, Content-Range, X-Chunked-Output, X-Stream-Output
267 /// access-control-expose-headers: Content-Range, X-Chunked-Output, X-Stream-Output
268 /// x-ipfs-lb-pop: gateway-bank3-sg1
269 /// strict-transport-security: max-age=31536000; includeSubDomains; preload
270 /// x-proxy-cache: MISS
271 /// ```
272 ///
273 /// - Dir with index.html
274 ///
275 /// ```http
276 /// :) curl -I http://127.0.0.1:8080/ipfs/QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az
277 /// HTTP/1.1 302 Found
278 /// Access-Control-Allow-Headers: Content-Type
279 /// Access-Control-Allow-Headers: Range
280 /// Access-Control-Allow-Headers: User-Agent
281 /// Access-Control-Allow-Headers: X-Requested-With
282 /// Access-Control-Allow-Methods: GET
283 /// Access-Control-Allow-Origin: *
284 /// Access-Control-Expose-Headers: Content-Length
285 /// Access-Control-Expose-Headers: Content-Range
286 /// Access-Control-Expose-Headers: X-Chunked-Output
287 /// Access-Control-Expose-Headers: X-Ipfs-Path
288 /// Access-Control-Expose-Headers: X-Ipfs-Roots
289 /// Access-Control-Expose-Headers: X-Stream-Output
290 /// Content-Type: text/html; charset=utf-8
291 /// Location: /ipfs/QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az/
292 /// X-Ipfs-Path: /ipfs/QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az
293 /// X-Ipfs-Roots: QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az
294 /// Date: Thu, 08 Sep 2022 00:52:29 GMT
295 /// ```
296 ///
297 /// In conclusion:
298 ///
299 /// - HTTP Status Code == 302 => directory
300 /// - HTTP Status Code == 200 && ETag starts with `"DirIndex` => directory
301 /// - HTTP Status Code == 200 && ETag not starts with `"DirIndex` => file
302 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
303 // Stat root always returns a DIR.
304 if path == "/" {
305 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
306 }
307
308 let resp = self.ipfs_head(path).await?;
309
310 let status = resp.status();
311
312 match status {
313 StatusCode::OK => {
314 let mut m = Metadata::new(EntryMode::Unknown);
315
316 if let Some(v) = parse_content_length(resp.headers())? {
317 m.set_content_length(v);
318 }
319
320 if let Some(v) = parse_content_type(resp.headers())? {
321 m.set_content_type(v);
322 }
323
324 if let Some(v) = parse_etag(resp.headers())? {
325 m.set_etag(v);
326
327 if v.starts_with("\"DirIndex") {
328 m.set_mode(EntryMode::DIR);
329 } else {
330 m.set_mode(EntryMode::FILE);
331 }
332 } else {
333 // Some service will stream the output of DirIndex.
334 // If we don't have an etag, it's highly to be a dir.
335 m.set_mode(EntryMode::DIR);
336 }
337
338 if let Some(v) = parse_content_disposition(resp.headers())? {
339 m.set_content_disposition(v);
340 }
341
342 Ok(RpStat::new(m))
343 }
344 StatusCode::FOUND | StatusCode::MOVED_PERMANENTLY => {
345 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
346 }
347 _ => Err(parse_error(resp)),
348 }
349 }
350
351 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
352 let resp = self.ipfs_get(path, args.range()).await?;
353
354 let status = resp.status();
355
356 match status {
357 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
358 Ok((RpRead::default(), resp.into_body()))
359 }
360 _ => {
361 let (part, mut body) = resp.into_parts();
362 let buf = body.to_buffer().await?;
363 Err(parse_error(Response::from_parts(part, buf)))
364 }
365 }
366 }
367
368 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
369 let l = DirStream::new(Arc::new(self.clone()), path);
370 Ok((RpList::default(), oio::PageLister::new(l)))
371 }
372}
373
374impl IpfsBackend {
375 pub async fn ipfs_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
376 let p = build_rooted_abs_path(&self.root, path);
377
378 let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
379
380 let mut req = Request::get(&url);
381
382 if !range.is_full() {
383 req = req.header(http::header::RANGE, range.to_header());
384 }
385
386 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
387
388 self.client.fetch(req).await
389 }
390
391 async fn ipfs_head(&self, path: &str) -> Result<Response<Buffer>> {
392 let p = build_rooted_abs_path(&self.root, path);
393
394 let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
395
396 let req = Request::head(&url);
397
398 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
399
400 self.client.send(req).await
401 }
402
403 async fn ipfs_list(&self, path: &str) -> Result<Response<Buffer>> {
404 let p = build_rooted_abs_path(&self.root, path);
405
406 let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
407
408 let mut req = Request::get(&url);
409
410 // Use "application/vnd.ipld.raw" to disable IPLD codec deserialization
411 // OpenDAL will parse ipld data directly.
412 //
413 // ref: https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md
414 req = req.header(http::header::ACCEPT, "application/vnd.ipld.raw");
415
416 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
417
418 self.client.send(req).await
419 }
420}
421
422pub struct DirStream {
423 backend: Arc<IpfsBackend>,
424 path: String,
425}
426
427impl DirStream {
428 fn new(backend: Arc<IpfsBackend>, path: &str) -> Self {
429 Self {
430 backend,
431 path: path.to_string(),
432 }
433 }
434}
435
436impl oio::PageList for DirStream {
437 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
438 let resp = self.backend.ipfs_list(&self.path).await?;
439
440 if resp.status() != StatusCode::OK {
441 return Err(parse_error(resp));
442 }
443
444 let bs = resp.into_body();
445 let pb_node = PBNode::decode(bs).map_err(|e| {
446 Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
447 })?;
448
449 let names = pb_node
450 .links
451 .into_iter()
452 .map(|v| v.name.unwrap())
453 .collect::<Vec<String>>();
454
455 for mut name in names {
456 let meta = self
457 .backend
458 .stat(&name, OpStat::new())
459 .await?
460 .into_metadata();
461
462 if meta.mode().is_dir() {
463 name += "/";
464 }
465
466 ctx.entries.push_back(oio::Entry::new(&name, meta))
467 }
468
469 ctx.done = true;
470 Ok(())
471 }
472}