opendal/services/ipfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Request;
23use http::Response;
24use http::StatusCode;
25use log::debug;
26use prost::Message;
27
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35    type Builder = IpfsBuilder;
36    fn into_builder(self) -> Self::Builder {
37        IpfsBuilder {
38            config: self,
39            http_client: None,
40        }
41    }
42}
43
44/// IPFS file system support based on [IPFS HTTP Gateway](https://docs.ipfs.tech/concepts/ipfs-gateway/).
45#[doc = include_str!("docs.md")]
46#[derive(Default, Clone, Debug)]
47pub struct IpfsBuilder {
48    config: IpfsConfig,
49    http_client: Option<HttpClient>,
50}
51
52impl IpfsBuilder {
53    /// Set root of ipfs backend.
54    ///
55    /// Root must be a valid ipfs address like the following:
56    ///
57    /// - `/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/` (IPFS with CID v0)
58    /// - `/ipfs/bafybeibozpulxtpv5nhfa2ue3dcjx23ndh3gwr5vwllk7ptoyfwnfjjr4q/` (IPFS with  CID v1)
59    /// - `/ipns/opendal.apache.org/` (IPNS)
60    pub fn root(mut self, root: &str) -> Self {
61        self.config.root = if root.is_empty() {
62            None
63        } else {
64            Some(root.to_string())
65        };
66
67        self
68    }
69
70    /// Set endpoint if ipfs backend.
71    ///
72    /// Endpoint must be a valid ipfs gateway which passed the [IPFS Gateway Checker](https://ipfs.github.io/public-gateway-checker/)
73    ///
74    /// Popular choices including:
75    ///
76    /// - `https://ipfs.io`
77    /// - `https://w3s.link`
78    /// - `https://dweb.link`
79    /// - `https://cloudflare-ipfs.com`
80    /// - `http://127.0.0.1:8080` (ipfs daemon in local)
81    pub fn endpoint(mut self, endpoint: &str) -> Self {
82        if !endpoint.is_empty() {
83            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
84            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
85        }
86
87        self
88    }
89
90    /// Specify the http client that used by this service.
91    ///
92    /// # Notes
93    ///
94    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
95    /// during minor updates.
96    pub fn http_client(mut self, client: HttpClient) -> Self {
97        self.http_client = Some(client);
98        self
99    }
100}
101
102impl Builder for IpfsBuilder {
103    const SCHEME: Scheme = Scheme::Ipfs;
104    type Config = IpfsConfig;
105
106    fn build(self) -> Result<impl Access> {
107        debug!("backend build started: {:?}", &self);
108
109        let root = normalize_root(&self.config.root.unwrap_or_default());
110        if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
111            return Err(Error::new(
112                ErrorKind::ConfigInvalid,
113                "root must start with /ipfs/ or /ipns/",
114            )
115            .with_context("service", Scheme::Ipfs)
116            .with_context("root", &root));
117        }
118        debug!("backend use root {}", root);
119
120        let endpoint = match &self.config.endpoint {
121            Some(endpoint) => Ok(endpoint.clone()),
122            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
123                .with_context("service", Scheme::Ipfs)
124                .with_context("root", &root)),
125        }?;
126        debug!("backend use endpoint {}", &endpoint);
127
128        let client = if let Some(client) = self.http_client {
129            client
130        } else {
131            HttpClient::new().map_err(|err| {
132                err.with_operation("Builder::build")
133                    .with_context("service", Scheme::Ipfs)
134            })?
135        };
136
137        Ok(IpfsBackend {
138            info: {
139                let ma = AccessorInfo::default();
140                ma.set_scheme(Scheme::Ipfs)
141                    .set_root(&root)
142                    .set_native_capability(Capability {
143                        stat: true,
144                        stat_has_content_length: true,
145                        stat_has_content_type: true,
146                        stat_has_etag: true,
147                        stat_has_content_disposition: true,
148
149                        read: true,
150
151                        list: true,
152
153                        shared: true,
154
155                        ..Default::default()
156                    });
157
158                ma.into()
159            },
160            root,
161            endpoint,
162            client,
163        })
164    }
165}
166
167/// Backend for IPFS.
168#[derive(Clone)]
169pub struct IpfsBackend {
170    info: Arc<AccessorInfo>,
171    endpoint: String,
172    root: String,
173    client: HttpClient,
174}
175
176impl Debug for IpfsBackend {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        f.debug_struct("Backend")
179            .field("endpoint", &self.endpoint)
180            .field("root", &self.root)
181            .field("client", &self.client)
182            .finish()
183    }
184}
185
186impl Access for IpfsBackend {
187    type Reader = HttpBody;
188    type Writer = ();
189    type Lister = oio::PageLister<DirStream>;
190    type Deleter = ();
191    type BlockingReader = ();
192    type BlockingWriter = ();
193    type BlockingLister = ();
194    type BlockingDeleter = ();
195
196    fn info(&self) -> Arc<AccessorInfo> {
197        self.info.clone()
198    }
199
200    /// IPFS's stat behavior highly depends on its implementation.
201    ///
202    /// Based on IPFS [Path Gateway Specification](https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md),
203    /// response payload could be:
204    ///
205    /// > - UnixFS (implicit default)
206    /// >   - File
207    /// >     - Bytes representing file contents
208    /// >   - Directory
209    /// >     - Generated HTML with directory index
210    /// >     - When `index.html` is present, gateway can skip generating directory index and return it instead
211    /// > - Raw block (not this case)
212    /// > - CAR (not this case)
213    ///
214    /// When we HEAD a given path, we could have the following responses:
215    ///
216    /// - File
217    ///
218    /// ```http
219    /// :) curl -I https://ipfs.io/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_file
220    /// HTTP/1.1 200 Connection established
221    ///
222    /// HTTP/2 200
223    /// server: openresty
224    /// date: Thu, 08 Sep 2022 00:48:50 GMT
225    /// content-type: application/octet-stream
226    /// content-length: 262144
227    /// access-control-allow-methods: GET
228    /// cache-control: public, max-age=29030400, immutable
229    /// etag: "QmdP6teFTLSNVhT4W5jkhEuUBsjQ3xkp1GmRvDU6937Me1"
230    /// x-ipfs-gateway-host: ipfs-bank11-fr2
231    /// x-ipfs-path: /ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_file
232    /// x-ipfs-roots: QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ,QmdP6teFTLSNVhT4W5jkhEuUBsjQ3xkp1GmRvDU6937Me1
233    /// x-ipfs-pop: ipfs-bank11-fr2
234    /// timing-allow-origin: *
235    /// x-ipfs-datasize: 262144
236    /// access-control-allow-origin: *
237    /// access-control-allow-methods: GET, POST, OPTIONS
238    /// access-control-allow-headers: X-Requested-With, Range, Content-Range, X-Chunked-Output, X-Stream-Output
239    /// access-control-expose-headers: Content-Range, X-Chunked-Output, X-Stream-Output
240    /// x-ipfs-lb-pop: gateway-bank1-fr2
241    /// strict-transport-security: max-age=31536000; includeSubDomains; preload
242    /// x-proxy-cache: MISS
243    /// accept-ranges: bytes
244    /// ```
245    ///
246    /// - Dir with generated index
247    ///
248    /// ```http
249    /// :( curl -I https://ipfs.io/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_dir
250    /// HTTP/1.1 200 Connection established
251    ///
252    /// HTTP/2 200
253    /// server: openresty
254    /// date: Wed, 07 Sep 2022 08:46:13 GMT
255    /// content-type: text/html
256    /// vary: Accept-Encoding
257    /// access-control-allow-methods: GET
258    /// etag: "DirIndex-2b567f6r5vvdg_CID-QmY44DyCDymRN1Qy7sGbupz1ysMkXTWomAQku5vBg7fRQW"
259    /// x-ipfs-gateway-host: ipfs-bank6-sg1
260    /// x-ipfs-path: /ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/normal_dir
261    /// x-ipfs-roots: QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ,QmY44DyCDymRN1Qy7sGbupz1ysMkXTWomAQku5vBg7fRQW
262    /// x-ipfs-pop: ipfs-bank6-sg1
263    /// timing-allow-origin: *
264    /// access-control-allow-origin: *
265    /// access-control-allow-methods: GET, POST, OPTIONS
266    /// access-control-allow-headers: X-Requested-With, Range, Content-Range, X-Chunked-Output, X-Stream-Output
267    /// access-control-expose-headers: Content-Range, X-Chunked-Output, X-Stream-Output
268    /// x-ipfs-lb-pop: gateway-bank3-sg1
269    /// strict-transport-security: max-age=31536000; includeSubDomains; preload
270    /// x-proxy-cache: MISS
271    /// ```
272    ///
273    /// - Dir with index.html
274    ///
275    /// ```http
276    /// :) curl -I http://127.0.0.1:8080/ipfs/QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az
277    /// HTTP/1.1 302 Found
278    /// Access-Control-Allow-Headers: Content-Type
279    /// Access-Control-Allow-Headers: Range
280    /// Access-Control-Allow-Headers: User-Agent
281    /// Access-Control-Allow-Headers: X-Requested-With
282    /// Access-Control-Allow-Methods: GET
283    /// Access-Control-Allow-Origin: *
284    /// Access-Control-Expose-Headers: Content-Length
285    /// Access-Control-Expose-Headers: Content-Range
286    /// Access-Control-Expose-Headers: X-Chunked-Output
287    /// Access-Control-Expose-Headers: X-Ipfs-Path
288    /// Access-Control-Expose-Headers: X-Ipfs-Roots
289    /// Access-Control-Expose-Headers: X-Stream-Output
290    /// Content-Type: text/html; charset=utf-8
291    /// Location: /ipfs/QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az/
292    /// X-Ipfs-Path: /ipfs/QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az
293    /// X-Ipfs-Roots: QmVturFGV3z4WsP7cRV8Ci4avCdGWYXk2qBKvtAwFUp5Az
294    /// Date: Thu, 08 Sep 2022 00:52:29 GMT
295    /// ```
296    ///
297    /// In conclusion:
298    ///
299    /// - HTTP Status Code == 302 => directory
300    /// - HTTP Status Code == 200 && ETag starts with `"DirIndex` => directory
301    /// - HTTP Status Code == 200 && ETag not starts with `"DirIndex` => file
302    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
303        // Stat root always returns a DIR.
304        if path == "/" {
305            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
306        }
307
308        let resp = self.ipfs_head(path).await?;
309
310        let status = resp.status();
311
312        match status {
313            StatusCode::OK => {
314                let mut m = Metadata::new(EntryMode::Unknown);
315
316                if let Some(v) = parse_content_length(resp.headers())? {
317                    m.set_content_length(v);
318                }
319
320                if let Some(v) = parse_content_type(resp.headers())? {
321                    m.set_content_type(v);
322                }
323
324                if let Some(v) = parse_etag(resp.headers())? {
325                    m.set_etag(v);
326
327                    if v.starts_with("\"DirIndex") {
328                        m.set_mode(EntryMode::DIR);
329                    } else {
330                        m.set_mode(EntryMode::FILE);
331                    }
332                } else {
333                    // Some service will stream the output of DirIndex.
334                    // If we don't have an etag, it's highly to be a dir.
335                    m.set_mode(EntryMode::DIR);
336                }
337
338                if let Some(v) = parse_content_disposition(resp.headers())? {
339                    m.set_content_disposition(v);
340                }
341
342                Ok(RpStat::new(m))
343            }
344            StatusCode::FOUND | StatusCode::MOVED_PERMANENTLY => {
345                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
346            }
347            _ => Err(parse_error(resp)),
348        }
349    }
350
351    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
352        let resp = self.ipfs_get(path, args.range()).await?;
353
354        let status = resp.status();
355
356        match status {
357            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
358                Ok((RpRead::default(), resp.into_body()))
359            }
360            _ => {
361                let (part, mut body) = resp.into_parts();
362                let buf = body.to_buffer().await?;
363                Err(parse_error(Response::from_parts(part, buf)))
364            }
365        }
366    }
367
368    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
369        let l = DirStream::new(Arc::new(self.clone()), path);
370        Ok((RpList::default(), oio::PageLister::new(l)))
371    }
372}
373
374impl IpfsBackend {
375    pub async fn ipfs_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
376        let p = build_rooted_abs_path(&self.root, path);
377
378        let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
379
380        let mut req = Request::get(&url);
381
382        if !range.is_full() {
383            req = req.header(http::header::RANGE, range.to_header());
384        }
385
386        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
387
388        self.client.fetch(req).await
389    }
390
391    async fn ipfs_head(&self, path: &str) -> Result<Response<Buffer>> {
392        let p = build_rooted_abs_path(&self.root, path);
393
394        let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
395
396        let req = Request::head(&url);
397
398        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
399
400        self.client.send(req).await
401    }
402
403    async fn ipfs_list(&self, path: &str) -> Result<Response<Buffer>> {
404        let p = build_rooted_abs_path(&self.root, path);
405
406        let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
407
408        let mut req = Request::get(&url);
409
410        // Use "application/vnd.ipld.raw" to disable IPLD codec deserialization
411        // OpenDAL will parse ipld data directly.
412        //
413        // ref: https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md
414        req = req.header(http::header::ACCEPT, "application/vnd.ipld.raw");
415
416        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
417
418        self.client.send(req).await
419    }
420}
421
422pub struct DirStream {
423    backend: Arc<IpfsBackend>,
424    path: String,
425}
426
427impl DirStream {
428    fn new(backend: Arc<IpfsBackend>, path: &str) -> Self {
429        Self {
430            backend,
431            path: path.to_string(),
432        }
433    }
434}
435
436impl oio::PageList for DirStream {
437    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
438        let resp = self.backend.ipfs_list(&self.path).await?;
439
440        if resp.status() != StatusCode::OK {
441            return Err(parse_error(resp));
442        }
443
444        let bs = resp.into_body();
445        let pb_node = PBNode::decode(bs).map_err(|e| {
446            Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
447        })?;
448
449        let names = pb_node
450            .links
451            .into_iter()
452            .map(|v| v.name.unwrap())
453            .collect::<Vec<String>>();
454
455        for mut name in names {
456            let meta = self
457                .backend
458                .stat(&name, OpStat::new())
459                .await?
460                .into_metadata();
461
462            if meta.mode().is_dir() {
463                name += "/";
464            }
465
466            ctx.entries.push_back(oio::Entry::new(&name, meta))
467        }
468
469        ctx.done = true;
470        Ok(())
471    }
472}