opendal/services/ipfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35    type Builder = IpfsBuilder;
36
37    #[allow(deprecated)]
38    fn into_builder(self) -> Self::Builder {
39        IpfsBuilder {
40            config: self,
41            http_client: None,
42        }
43    }
44}
45
46/// IPFS file system support based on [IPFS HTTP Gateway](https://docs.ipfs.tech/concepts/ipfs-gateway/).
47#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50    config: IpfsConfig,
51
52    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53    http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57    /// Set root of ipfs backend.
58    ///
59    /// Root must be a valid ipfs address like the following:
60    ///
61    /// - `/ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/` (IPFS with CID v0)
62    /// - `/ipfs/bafybeibozpulxtpv5nhfa2ue3dcjx23ndh3gwr5vwllk7ptoyfwnfjjr4q/` (IPFS with  CID v1)
63    /// - `/ipns/opendal.apache.org/` (IPNS)
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint if ipfs backend.
75    ///
76    /// Endpoint must be a valid ipfs gateway which passed the [IPFS Gateway Checker](https://ipfs.github.io/public-gateway-checker/)
77    ///
78    /// Popular choices including:
79    ///
80    /// - `https://ipfs.io`
81    /// - `https://w3s.link`
82    /// - `https://dweb.link`
83    /// - `https://cloudflare-ipfs.com`
84    /// - `http://127.0.0.1:8080` (ipfs daemon in local)
85    pub fn endpoint(mut self, endpoint: &str) -> Self {
86        if !endpoint.is_empty() {
87            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
88            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89        }
90
91        self
92    }
93
94    /// Specify the http client that used by this service.
95    ///
96    /// # Notes
97    ///
98    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
99    /// during minor updates.
100    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101    #[allow(deprecated)]
102    pub fn http_client(mut self, client: HttpClient) -> Self {
103        self.http_client = Some(client);
104        self
105    }
106}
107
108impl Builder for IpfsBuilder {
109    const SCHEME: Scheme = Scheme::Ipfs;
110    type Config = IpfsConfig;
111
112    fn build(self) -> Result<impl Access> {
113        debug!("backend build started: {:?}", &self);
114
115        let root = normalize_root(&self.config.root.unwrap_or_default());
116        if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
117            return Err(Error::new(
118                ErrorKind::ConfigInvalid,
119                "root must start with /ipfs/ or /ipns/",
120            )
121            .with_context("service", Scheme::Ipfs)
122            .with_context("root", &root));
123        }
124        debug!("backend use root {}", root);
125
126        let endpoint = match &self.config.endpoint {
127            Some(endpoint) => Ok(endpoint.clone()),
128            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
129                .with_context("service", Scheme::Ipfs)
130                .with_context("root", &root)),
131        }?;
132        debug!("backend use endpoint {}", &endpoint);
133
134        let info = AccessorInfo::default();
135        info.set_scheme(Scheme::Ipfs)
136            .set_root(&root)
137            .set_native_capability(Capability {
138                stat: true,
139                stat_has_content_length: true,
140                stat_has_content_type: true,
141                stat_has_etag: true,
142                stat_has_content_disposition: true,
143
144                read: true,
145
146                list: true,
147
148                shared: true,
149
150                ..Default::default()
151            });
152
153        let accessor_info = Arc::new(info);
154        let core = Arc::new(IpfsCore {
155            info: accessor_info,
156            root,
157            endpoint,
158        });
159
160        Ok(IpfsBackend { core })
161    }
162}
163
164/// Backend for IPFS.
165#[derive(Clone)]
166pub struct IpfsBackend {
167    core: Arc<IpfsCore>,
168}
169
170impl Debug for IpfsBackend {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("IpfsBackend")
173            .field("core", &self.core)
174            .finish()
175    }
176}
177
178impl Access for IpfsBackend {
179    type Reader = HttpBody;
180    type Writer = ();
181    type Lister = oio::PageLister<DirStream>;
182    type Deleter = ();
183    type BlockingReader = ();
184    type BlockingWriter = ();
185    type BlockingLister = ();
186    type BlockingDeleter = ();
187
188    fn info(&self) -> Arc<AccessorInfo> {
189        self.core.info.clone()
190    }
191
192    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
193        let metadata = self.core.ipfs_stat(path).await?;
194        Ok(RpStat::new(metadata))
195    }
196
197    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
198        let resp = self.core.ipfs_get(path, args.range()).await?;
199
200        let status = resp.status();
201
202        match status {
203            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
204                Ok((RpRead::default(), resp.into_body()))
205            }
206            _ => {
207                let (part, mut body) = resp.into_parts();
208                let buf = body.to_buffer().await?;
209                Err(parse_error(Response::from_parts(part, buf)))
210            }
211        }
212    }
213
214    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
215        let l = DirStream::new(self.core.clone(), path);
216        Ok((RpList::default(), oio::PageLister::new(l)))
217    }
218}
219
220pub struct DirStream {
221    core: Arc<IpfsCore>,
222    path: String,
223}
224
225impl DirStream {
226    fn new(core: Arc<IpfsCore>, path: &str) -> Self {
227        Self {
228            core,
229            path: path.to_string(),
230        }
231    }
232}
233
234impl oio::PageList for DirStream {
235    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
236        let resp = self.core.ipfs_list(&self.path).await?;
237
238        if resp.status() != StatusCode::OK {
239            return Err(parse_error(resp));
240        }
241
242        let bs = resp.into_body();
243        let pb_node = PBNode::decode(bs).map_err(|e| {
244            Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
245        })?;
246
247        let names = pb_node
248            .links
249            .into_iter()
250            .map(|v| v.name.unwrap())
251            .collect::<Vec<String>>();
252
253        for mut name in names {
254            let meta = self.core.ipfs_stat(&name).await?;
255
256            if meta.mode().is_dir() {
257                name += "/";
258            }
259
260            ctx.entries.push_back(oio::Entry::new(&name, meta))
261        }
262
263        ctx.done = true;
264        Ok(())
265    }
266}