opendal/services/ipfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35 type Builder = IpfsBuilder;
36
37 #[allow(deprecated)]
38 fn into_builder(self) -> Self::Builder {
39 IpfsBuilder {
40 config: self,
41 http_client: None,
42 }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50 config: IpfsConfig,
51
52 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53 http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
86 if !endpoint.is_empty() {
87 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89 }
90
91 self
92 }
93
94 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101 #[allow(deprecated)]
102 pub fn http_client(mut self, client: HttpClient) -> Self {
103 self.http_client = Some(client);
104 self
105 }
106}
107
108impl Builder for IpfsBuilder {
109 const SCHEME: Scheme = Scheme::Ipfs;
110 type Config = IpfsConfig;
111
112 fn build(self) -> Result<impl Access> {
113 debug!("backend build started: {:?}", &self);
114
115 let root = normalize_root(&self.config.root.unwrap_or_default());
116 if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
117 return Err(Error::new(
118 ErrorKind::ConfigInvalid,
119 "root must start with /ipfs/ or /ipns/",
120 )
121 .with_context("service", Scheme::Ipfs)
122 .with_context("root", &root));
123 }
124 debug!("backend use root {}", root);
125
126 let endpoint = match &self.config.endpoint {
127 Some(endpoint) => Ok(endpoint.clone()),
128 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
129 .with_context("service", Scheme::Ipfs)
130 .with_context("root", &root)),
131 }?;
132 debug!("backend use endpoint {}", &endpoint);
133
134 let info = AccessorInfo::default();
135 info.set_scheme(Scheme::Ipfs)
136 .set_root(&root)
137 .set_native_capability(Capability {
138 stat: true,
139 stat_has_content_length: true,
140 stat_has_content_type: true,
141 stat_has_etag: true,
142 stat_has_content_disposition: true,
143
144 read: true,
145
146 list: true,
147
148 shared: true,
149
150 ..Default::default()
151 });
152
153 let accessor_info = Arc::new(info);
154 let core = Arc::new(IpfsCore {
155 info: accessor_info,
156 root,
157 endpoint,
158 });
159
160 Ok(IpfsBackend { core })
161 }
162}
163
164#[derive(Clone)]
166pub struct IpfsBackend {
167 core: Arc<IpfsCore>,
168}
169
170impl Debug for IpfsBackend {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("IpfsBackend")
173 .field("core", &self.core)
174 .finish()
175 }
176}
177
178impl Access for IpfsBackend {
179 type Reader = HttpBody;
180 type Writer = ();
181 type Lister = oio::PageLister<DirStream>;
182 type Deleter = ();
183
184 fn info(&self) -> Arc<AccessorInfo> {
185 self.core.info.clone()
186 }
187
188 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
189 let metadata = self.core.ipfs_stat(path).await?;
190 Ok(RpStat::new(metadata))
191 }
192
193 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
194 let resp = self.core.ipfs_get(path, args.range()).await?;
195
196 let status = resp.status();
197
198 match status {
199 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
200 Ok((RpRead::default(), resp.into_body()))
201 }
202 _ => {
203 let (part, mut body) = resp.into_parts();
204 let buf = body.to_buffer().await?;
205 Err(parse_error(Response::from_parts(part, buf)))
206 }
207 }
208 }
209
210 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
211 let l = DirStream::new(self.core.clone(), path);
212 Ok((RpList::default(), oio::PageLister::new(l)))
213 }
214}
215
216pub struct DirStream {
217 core: Arc<IpfsCore>,
218 path: String,
219}
220
221impl DirStream {
222 fn new(core: Arc<IpfsCore>, path: &str) -> Self {
223 Self {
224 core,
225 path: path.to_string(),
226 }
227 }
228}
229
230impl oio::PageList for DirStream {
231 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
232 let resp = self.core.ipfs_list(&self.path).await?;
233
234 if resp.status() != StatusCode::OK {
235 return Err(parse_error(resp));
236 }
237
238 let bs = resp.into_body();
239 let pb_node = PBNode::decode(bs).map_err(|e| {
240 Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
241 })?;
242
243 let names = pb_node
244 .links
245 .into_iter()
246 .map(|v| v.name.unwrap())
247 .collect::<Vec<String>>();
248
249 for mut name in names {
250 let meta = self.core.ipfs_stat(&name).await?;
251
252 if meta.mode().is_dir() {
253 name += "/";
254 }
255
256 ctx.entries.push_back(oio::Entry::new(&name, meta))
257 }
258
259 ctx.done = true;
260 Ok(())
261 }
262}