opendal/services/ipfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use prost::Message;
25
26use super::IPFS_SCHEME;
27use super::config::IpfsConfig;
28use super::core::IpfsCore;
29use super::error::parse_error;
30use super::ipld::PBNode;
31use crate::raw::*;
32use crate::*;
33
34/// IPFS file system support based on [IPFS HTTP Gateway](https://docs.ipfs.tech/concepts/ipfs-gateway/).
35#[doc = include_str!("docs.md")]
36#[derive(Default)]
37pub struct IpfsBuilder {
38    pub(super) config: IpfsConfig,
39
40    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
41    pub(super) http_client: Option<HttpClient>,
42}
43
44impl Debug for IpfsBuilder {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("IpfsBuilder")
47            .field("config", &self.config)
48            .finish()
49    }
50}
51
52impl IpfsBuilder {
53    /// Set root of ipfs backend.
54    ///
55    /// Root must be a valid ipfs address like the following:
56    ///
57    /// - `/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/` (IPFS with CID v0)
58    /// - `/ipfs/bafybeibozpulxtpv5nhfa2ue3dcjx23ndh3gwr5vwllk7ptoyfwnfjjr4q/` (IPFS with  CID v1)
59    /// - `/ipns/opendal.apache.org/` (IPNS)
60    pub fn root(mut self, root: &str) -> Self {
61        self.config.root = if root.is_empty() {
62            None
63        } else {
64            Some(root.to_string())
65        };
66
67        self
68    }
69
70    /// Set endpoint if ipfs backend.
71    ///
72    /// Endpoint must be a valid ipfs gateway which passed the [IPFS Gateway Checker](https://ipfs.github.io/public-gateway-checker/)
73    ///
74    /// Popular choices including:
75    ///
76    /// - `https://ipfs.io`
77    /// - `https://w3s.link`
78    /// - `https://dweb.link`
79    /// - `https://cloudflare-ipfs.com`
80    /// - `http://127.0.0.1:8080` (ipfs daemon in local)
81    pub fn endpoint(mut self, endpoint: &str) -> Self {
82        if !endpoint.is_empty() {
83            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
84            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
85        }
86
87        self
88    }
89
90    /// Specify the http client that used by this service.
91    ///
92    /// # Notes
93    ///
94    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
95    /// during minor updates.
96    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
97    #[allow(deprecated)]
98    pub fn http_client(mut self, client: HttpClient) -> Self {
99        self.http_client = Some(client);
100        self
101    }
102}
103
104impl Builder for IpfsBuilder {
105    type Config = IpfsConfig;
106
107    fn build(self) -> Result<impl Access> {
108        debug!("backend build started: {:?}", &self);
109
110        let root = normalize_root(&self.config.root.unwrap_or_default());
111        if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
112            return Err(Error::new(
113                ErrorKind::ConfigInvalid,
114                "root must start with /ipfs/ or /ipns/",
115            )
116            .with_context("service", Scheme::Ipfs)
117            .with_context("root", &root));
118        }
119        debug!("backend use root {root}");
120
121        let endpoint = match &self.config.endpoint {
122            Some(endpoint) => Ok(endpoint.clone()),
123            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
124                .with_context("service", Scheme::Ipfs)
125                .with_context("root", &root)),
126        }?;
127        debug!("backend use endpoint {}", &endpoint);
128
129        let info = AccessorInfo::default();
130        info.set_scheme(IPFS_SCHEME)
131            .set_root(&root)
132            .set_native_capability(Capability {
133                stat: true,
134
135                read: true,
136
137                list: true,
138
139                shared: true,
140
141                ..Default::default()
142            });
143
144        let accessor_info = Arc::new(info);
145        let core = Arc::new(IpfsCore {
146            info: accessor_info,
147            root,
148            endpoint,
149        });
150
151        Ok(IpfsBackend { core })
152    }
153}
154
155/// Backend for IPFS.
156#[derive(Clone, Debug)]
157pub struct IpfsBackend {
158    core: Arc<IpfsCore>,
159}
160
161impl Access for IpfsBackend {
162    type Reader = HttpBody;
163    type Writer = ();
164    type Lister = oio::PageLister<DirStream>;
165    type Deleter = ();
166
167    fn info(&self) -> Arc<AccessorInfo> {
168        self.core.info.clone()
169    }
170
171    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
172        let metadata = self.core.ipfs_stat(path).await?;
173        Ok(RpStat::new(metadata))
174    }
175
176    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
177        let resp = self.core.ipfs_get(path, args.range()).await?;
178
179        let status = resp.status();
180
181        match status {
182            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
183                Ok((RpRead::default(), resp.into_body()))
184            }
185            _ => {
186                let (part, mut body) = resp.into_parts();
187                let buf = body.to_buffer().await?;
188                Err(parse_error(Response::from_parts(part, buf)))
189            }
190        }
191    }
192
193    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
194        let l = DirStream::new(self.core.clone(), path);
195        Ok((RpList::default(), oio::PageLister::new(l)))
196    }
197}
198
199pub struct DirStream {
200    core: Arc<IpfsCore>,
201    path: String,
202}
203
204impl DirStream {
205    fn new(core: Arc<IpfsCore>, path: &str) -> Self {
206        Self {
207            core,
208            path: path.to_string(),
209        }
210    }
211}
212
213impl oio::PageList for DirStream {
214    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
215        let resp = self.core.ipfs_list(&self.path).await?;
216
217        if resp.status() != StatusCode::OK {
218            return Err(parse_error(resp));
219        }
220
221        let bs = resp.into_body();
222        let pb_node = PBNode::decode(bs).map_err(|e| {
223            Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
224        })?;
225
226        let names = pb_node
227            .links
228            .into_iter()
229            .map(|v| v.name.unwrap())
230            .collect::<Vec<String>>();
231
232        for mut name in names {
233            let meta = self.core.ipfs_stat(&name).await?;
234
235            if meta.mode().is_dir() {
236                name += "/";
237            }
238
239            ctx.entries.push_back(oio::Entry::new(&name, meta))
240        }
241
242        ctx.done = true;
243        Ok(())
244    }
245}