opendal/services/ipfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35 type Builder = IpfsBuilder;
36
37 #[allow(deprecated)]
38 fn into_builder(self) -> Self::Builder {
39 IpfsBuilder {
40 config: self,
41 http_client: None,
42 }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50 config: IpfsConfig,
51
52 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53 http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
86 if !endpoint.is_empty() {
87 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89 }
90
91 self
92 }
93
94 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101 #[allow(deprecated)]
102 pub fn http_client(mut self, client: HttpClient) -> Self {
103 self.http_client = Some(client);
104 self
105 }
106}
107
108impl Builder for IpfsBuilder {
109 const SCHEME: Scheme = Scheme::Ipfs;
110 type Config = IpfsConfig;
111
112 fn build(self) -> Result<impl Access> {
113 debug!("backend build started: {:?}", &self);
114
115 let root = normalize_root(&self.config.root.unwrap_or_default());
116 if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
117 return Err(Error::new(
118 ErrorKind::ConfigInvalid,
119 "root must start with /ipfs/ or /ipns/",
120 )
121 .with_context("service", Scheme::Ipfs)
122 .with_context("root", &root));
123 }
124 debug!("backend use root {}", root);
125
126 let endpoint = match &self.config.endpoint {
127 Some(endpoint) => Ok(endpoint.clone()),
128 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
129 .with_context("service", Scheme::Ipfs)
130 .with_context("root", &root)),
131 }?;
132 debug!("backend use endpoint {}", &endpoint);
133
134 let info = AccessorInfo::default();
135 info.set_scheme(Scheme::Ipfs)
136 .set_root(&root)
137 .set_native_capability(Capability {
138 stat: true,
139 stat_has_content_length: true,
140 stat_has_content_type: true,
141 stat_has_etag: true,
142 stat_has_content_disposition: true,
143
144 read: true,
145
146 list: true,
147
148 shared: true,
149
150 ..Default::default()
151 });
152
153 let accessor_info = Arc::new(info);
154 let core = Arc::new(IpfsCore {
155 info: accessor_info,
156 root,
157 endpoint,
158 });
159
160 Ok(IpfsBackend { core })
161 }
162}
163
164#[derive(Clone)]
166pub struct IpfsBackend {
167 core: Arc<IpfsCore>,
168}
169
170impl Debug for IpfsBackend {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("IpfsBackend")
173 .field("core", &self.core)
174 .finish()
175 }
176}
177
178impl Access for IpfsBackend {
179 type Reader = HttpBody;
180 type Writer = ();
181 type Lister = oio::PageLister<DirStream>;
182 type Deleter = ();
183 type BlockingReader = ();
184 type BlockingWriter = ();
185 type BlockingLister = ();
186 type BlockingDeleter = ();
187
188 fn info(&self) -> Arc<AccessorInfo> {
189 self.core.info.clone()
190 }
191
192 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
193 let metadata = self.core.ipfs_stat(path).await?;
194 Ok(RpStat::new(metadata))
195 }
196
197 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
198 let resp = self.core.ipfs_get(path, args.range()).await?;
199
200 let status = resp.status();
201
202 match status {
203 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
204 Ok((RpRead::default(), resp.into_body()))
205 }
206 _ => {
207 let (part, mut body) = resp.into_parts();
208 let buf = body.to_buffer().await?;
209 Err(parse_error(Response::from_parts(part, buf)))
210 }
211 }
212 }
213
214 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
215 let l = DirStream::new(self.core.clone(), path);
216 Ok((RpList::default(), oio::PageLister::new(l)))
217 }
218}
219
220pub struct DirStream {
221 core: Arc<IpfsCore>,
222 path: String,
223}
224
225impl DirStream {
226 fn new(core: Arc<IpfsCore>, path: &str) -> Self {
227 Self {
228 core,
229 path: path.to_string(),
230 }
231 }
232}
233
234impl oio::PageList for DirStream {
235 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
236 let resp = self.core.ipfs_list(&self.path).await?;
237
238 if resp.status() != StatusCode::OK {
239 return Err(parse_error(resp));
240 }
241
242 let bs = resp.into_body();
243 let pb_node = PBNode::decode(bs).map_err(|e| {
244 Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
245 })?;
246
247 let names = pb_node
248 .links
249 .into_iter()
250 .map(|v| v.name.unwrap())
251 .collect::<Vec<String>>();
252
253 for mut name in names {
254 let meta = self.core.ipfs_stat(&name).await?;
255
256 if meta.mode().is_dir() {
257 name += "/";
258 }
259
260 ctx.entries.push_back(oio::Entry::new(&name, meta))
261 }
262
263 ctx.done = true;
264 Ok(())
265 }
266}