opendal/services/ipfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use super::DEFAULT_SCHEME;
31use crate::raw::*;
32use crate::services::IpfsConfig;
33use crate::*;
34impl Configurator for IpfsConfig {
35    type Builder = IpfsBuilder;
36
37    #[allow(deprecated)]
38    fn into_builder(self) -> Self::Builder {
39        IpfsBuilder {
40            config: self,
41            http_client: None,
42        }
43    }
44}
45
46/// IPFS file system support based on [IPFS HTTP Gateway](https://docs.ipfs.tech/concepts/ipfs-gateway/).
47#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50    config: IpfsConfig,
51
52    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53    http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57    /// Set root of ipfs backend.
58    ///
59    /// Root must be a valid ipfs address like the following:
60    ///
61    /// - `/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/` (IPFS with CID v0)
62    /// - `/ipfs/bafybeibozpulxtpv5nhfa2ue3dcjx23ndh3gwr5vwllk7ptoyfwnfjjr4q/` (IPFS with  CID v1)
63    /// - `/ipns/opendal.apache.org/` (IPNS)
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint if ipfs backend.
75    ///
76    /// Endpoint must be a valid ipfs gateway which passed the [IPFS Gateway Checker](https://ipfs.github.io/public-gateway-checker/)
77    ///
78    /// Popular choices including:
79    ///
80    /// - `https://ipfs.io`
81    /// - `https://w3s.link`
82    /// - `https://dweb.link`
83    /// - `https://cloudflare-ipfs.com`
84    /// - `http://127.0.0.1:8080` (ipfs daemon in local)
85    pub fn endpoint(mut self, endpoint: &str) -> Self {
86        if !endpoint.is_empty() {
87            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
88            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89        }
90
91        self
92    }
93
94    /// Specify the http client that used by this service.
95    ///
96    /// # Notes
97    ///
98    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
99    /// during minor updates.
100    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101    #[allow(deprecated)]
102    pub fn http_client(mut self, client: HttpClient) -> Self {
103        self.http_client = Some(client);
104        self
105    }
106}
107
108impl Builder for IpfsBuilder {
109    type Config = IpfsConfig;
110
111    fn build(self) -> Result<impl Access> {
112        debug!("backend build started: {:?}", &self);
113
114        let root = normalize_root(&self.config.root.unwrap_or_default());
115        if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
116            return Err(Error::new(
117                ErrorKind::ConfigInvalid,
118                "root must start with /ipfs/ or /ipns/",
119            )
120            .with_context("service", Scheme::Ipfs)
121            .with_context("root", &root));
122        }
123        debug!("backend use root {root}");
124
125        let endpoint = match &self.config.endpoint {
126            Some(endpoint) => Ok(endpoint.clone()),
127            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
128                .with_context("service", Scheme::Ipfs)
129                .with_context("root", &root)),
130        }?;
131        debug!("backend use endpoint {}", &endpoint);
132
133        let info = AccessorInfo::default();
134        info.set_scheme(DEFAULT_SCHEME)
135            .set_root(&root)
136            .set_native_capability(Capability {
137                stat: true,
138
139                read: true,
140
141                list: true,
142
143                shared: true,
144
145                ..Default::default()
146            });
147
148        let accessor_info = Arc::new(info);
149        let core = Arc::new(IpfsCore {
150            info: accessor_info,
151            root,
152            endpoint,
153        });
154
155        Ok(IpfsBackend { core })
156    }
157}
158
159/// Backend for IPFS.
160#[derive(Clone)]
161pub struct IpfsBackend {
162    core: Arc<IpfsCore>,
163}
164
165impl Debug for IpfsBackend {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        f.debug_struct("IpfsBackend")
168            .field("core", &self.core)
169            .finish()
170    }
171}
172
173impl Access for IpfsBackend {
174    type Reader = HttpBody;
175    type Writer = ();
176    type Lister = oio::PageLister<DirStream>;
177    type Deleter = ();
178
179    fn info(&self) -> Arc<AccessorInfo> {
180        self.core.info.clone()
181    }
182
183    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
184        let metadata = self.core.ipfs_stat(path).await?;
185        Ok(RpStat::new(metadata))
186    }
187
188    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
189        let resp = self.core.ipfs_get(path, args.range()).await?;
190
191        let status = resp.status();
192
193        match status {
194            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
195                Ok((RpRead::default(), resp.into_body()))
196            }
197            _ => {
198                let (part, mut body) = resp.into_parts();
199                let buf = body.to_buffer().await?;
200                Err(parse_error(Response::from_parts(part, buf)))
201            }
202        }
203    }
204
205    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
206        let l = DirStream::new(self.core.clone(), path);
207        Ok((RpList::default(), oio::PageLister::new(l)))
208    }
209}
210
211pub struct DirStream {
212    core: Arc<IpfsCore>,
213    path: String,
214}
215
216impl DirStream {
217    fn new(core: Arc<IpfsCore>, path: &str) -> Self {
218        Self {
219            core,
220            path: path.to_string(),
221        }
222    }
223}
224
225impl oio::PageList for DirStream {
226    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
227        let resp = self.core.ipfs_list(&self.path).await?;
228
229        if resp.status() != StatusCode::OK {
230            return Err(parse_error(resp));
231        }
232
233        let bs = resp.into_body();
234        let pb_node = PBNode::decode(bs).map_err(|e| {
235            Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
236        })?;
237
238        let names = pb_node
239            .links
240            .into_iter()
241            .map(|v| v.name.unwrap())
242            .collect::<Vec<String>>();
243
244        for mut name in names {
245            let meta = self.core.ipfs_stat(&name).await?;
246
247            if meta.mode().is_dir() {
248                name += "/";
249            }
250
251            ctx.entries.push_back(oio::Entry::new(&name, meta))
252        }
253
254        ctx.done = true;
255        Ok(())
256    }
257}