opendal/services/ipfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35    type Builder = IpfsBuilder;
36
37    #[allow(deprecated)]
38    fn into_builder(self) -> Self::Builder {
39        IpfsBuilder {
40            config: self,
41            http_client: None,
42        }
43    }
44}
45
46/// IPFS file system support based on [IPFS HTTP Gateway](https://docs.ipfs.tech/concepts/ipfs-gateway/).
47#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50    config: IpfsConfig,
51
52    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53    http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57    /// Set root of ipfs backend.
58    ///
59    /// Root must be a valid ipfs address like the following:
60    ///
61    /// - `/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/` (IPFS with CID v0)
62    /// - `/ipfs/bafybeibozpulxtpv5nhfa2ue3dcjx23ndh3gwr5vwllk7ptoyfwnfjjr4q/` (IPFS with  CID v1)
63    /// - `/ipns/opendal.apache.org/` (IPNS)
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint if ipfs backend.
75    ///
76    /// Endpoint must be a valid ipfs gateway which passed the [IPFS Gateway Checker](https://ipfs.github.io/public-gateway-checker/)
77    ///
78    /// Popular choices including:
79    ///
80    /// - `https://ipfs.io`
81    /// - `https://w3s.link`
82    /// - `https://dweb.link`
83    /// - `https://cloudflare-ipfs.com`
84    /// - `http://127.0.0.1:8080` (ipfs daemon in local)
85    pub fn endpoint(mut self, endpoint: &str) -> Self {
86        if !endpoint.is_empty() {
87            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
88            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89        }
90
91        self
92    }
93
94    /// Specify the http client that used by this service.
95    ///
96    /// # Notes
97    ///
98    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
99    /// during minor updates.
100    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101    #[allow(deprecated)]
102    pub fn http_client(mut self, client: HttpClient) -> Self {
103        self.http_client = Some(client);
104        self
105    }
106}
107
108impl Builder for IpfsBuilder {
109    const SCHEME: Scheme = Scheme::Ipfs;
110    type Config = IpfsConfig;
111
112    fn build(self) -> Result<impl Access> {
113        debug!("backend build started: {:?}", &self);
114
115        let root = normalize_root(&self.config.root.unwrap_or_default());
116        if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
117            return Err(Error::new(
118                ErrorKind::ConfigInvalid,
119                "root must start with /ipfs/ or /ipns/",
120            )
121            .with_context("service", Scheme::Ipfs)
122            .with_context("root", &root));
123        }
124        debug!("backend use root {root}");
125
126        let endpoint = match &self.config.endpoint {
127            Some(endpoint) => Ok(endpoint.clone()),
128            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
129                .with_context("service", Scheme::Ipfs)
130                .with_context("root", &root)),
131        }?;
132        debug!("backend use endpoint {}", &endpoint);
133
134        let info = AccessorInfo::default();
135        info.set_scheme(Scheme::Ipfs)
136            .set_root(&root)
137            .set_native_capability(Capability {
138                stat: true,
139
140                read: true,
141
142                list: true,
143
144                shared: true,
145
146                ..Default::default()
147            });
148
149        let accessor_info = Arc::new(info);
150        let core = Arc::new(IpfsCore {
151            info: accessor_info,
152            root,
153            endpoint,
154        });
155
156        Ok(IpfsBackend { core })
157    }
158}
159
160/// Backend for IPFS.
161#[derive(Clone)]
162pub struct IpfsBackend {
163    core: Arc<IpfsCore>,
164}
165
166impl Debug for IpfsBackend {
167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168        f.debug_struct("IpfsBackend")
169            .field("core", &self.core)
170            .finish()
171    }
172}
173
174impl Access for IpfsBackend {
175    type Reader = HttpBody;
176    type Writer = ();
177    type Lister = oio::PageLister<DirStream>;
178    type Deleter = ();
179
180    fn info(&self) -> Arc<AccessorInfo> {
181        self.core.info.clone()
182    }
183
184    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
185        let metadata = self.core.ipfs_stat(path).await?;
186        Ok(RpStat::new(metadata))
187    }
188
189    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
190        let resp = self.core.ipfs_get(path, args.range()).await?;
191
192        let status = resp.status();
193
194        match status {
195            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
196                Ok((RpRead::default(), resp.into_body()))
197            }
198            _ => {
199                let (part, mut body) = resp.into_parts();
200                let buf = body.to_buffer().await?;
201                Err(parse_error(Response::from_parts(part, buf)))
202            }
203        }
204    }
205
206    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
207        let l = DirStream::new(self.core.clone(), path);
208        Ok((RpList::default(), oio::PageLister::new(l)))
209    }
210}
211
212pub struct DirStream {
213    core: Arc<IpfsCore>,
214    path: String,
215}
216
217impl DirStream {
218    fn new(core: Arc<IpfsCore>, path: &str) -> Self {
219        Self {
220            core,
221            path: path.to_string(),
222        }
223    }
224}
225
226impl oio::PageList for DirStream {
227    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
228        let resp = self.core.ipfs_list(&self.path).await?;
229
230        if resp.status() != StatusCode::OK {
231            return Err(parse_error(resp));
232        }
233
234        let bs = resp.into_body();
235        let pb_node = PBNode::decode(bs).map_err(|e| {
236            Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
237        })?;
238
239        let names = pb_node
240            .links
241            .into_iter()
242            .map(|v| v.name.unwrap())
243            .collect::<Vec<String>>();
244
245        for mut name in names {
246            let meta = self.core.ipfs_stat(&name).await?;
247
248            if meta.mode().is_dir() {
249                name += "/";
250            }
251
252            ctx.entries.push_back(oio::Entry::new(&name, meta))
253        }
254
255        ctx.done = true;
256        Ok(())
257    }
258}