opendal/services/ipfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use crate::raw::*;
31use crate::services::IpfsConfig;
32use crate::*;
33
34impl Configurator for IpfsConfig {
35 type Builder = IpfsBuilder;
36
37 #[allow(deprecated)]
38 fn into_builder(self) -> Self::Builder {
39 IpfsBuilder {
40 config: self,
41 http_client: None,
42 }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50 config: IpfsConfig,
51
52 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53 http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
86 if !endpoint.is_empty() {
87 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89 }
90
91 self
92 }
93
94 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101 #[allow(deprecated)]
102 pub fn http_client(mut self, client: HttpClient) -> Self {
103 self.http_client = Some(client);
104 self
105 }
106}
107
108impl Builder for IpfsBuilder {
109 const SCHEME: Scheme = Scheme::Ipfs;
110 type Config = IpfsConfig;
111
112 fn build(self) -> Result<impl Access> {
113 debug!("backend build started: {:?}", &self);
114
115 let root = normalize_root(&self.config.root.unwrap_or_default());
116 if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
117 return Err(Error::new(
118 ErrorKind::ConfigInvalid,
119 "root must start with /ipfs/ or /ipns/",
120 )
121 .with_context("service", Scheme::Ipfs)
122 .with_context("root", &root));
123 }
124 debug!("backend use root {root}");
125
126 let endpoint = match &self.config.endpoint {
127 Some(endpoint) => Ok(endpoint.clone()),
128 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
129 .with_context("service", Scheme::Ipfs)
130 .with_context("root", &root)),
131 }?;
132 debug!("backend use endpoint {}", &endpoint);
133
134 let info = AccessorInfo::default();
135 info.set_scheme(Scheme::Ipfs)
136 .set_root(&root)
137 .set_native_capability(Capability {
138 stat: true,
139
140 read: true,
141
142 list: true,
143
144 shared: true,
145
146 ..Default::default()
147 });
148
149 let accessor_info = Arc::new(info);
150 let core = Arc::new(IpfsCore {
151 info: accessor_info,
152 root,
153 endpoint,
154 });
155
156 Ok(IpfsBackend { core })
157 }
158}
159
160#[derive(Clone)]
162pub struct IpfsBackend {
163 core: Arc<IpfsCore>,
164}
165
166impl Debug for IpfsBackend {
167 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168 f.debug_struct("IpfsBackend")
169 .field("core", &self.core)
170 .finish()
171 }
172}
173
174impl Access for IpfsBackend {
175 type Reader = HttpBody;
176 type Writer = ();
177 type Lister = oio::PageLister<DirStream>;
178 type Deleter = ();
179
180 fn info(&self) -> Arc<AccessorInfo> {
181 self.core.info.clone()
182 }
183
184 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
185 let metadata = self.core.ipfs_stat(path).await?;
186 Ok(RpStat::new(metadata))
187 }
188
189 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
190 let resp = self.core.ipfs_get(path, args.range()).await?;
191
192 let status = resp.status();
193
194 match status {
195 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
196 Ok((RpRead::default(), resp.into_body()))
197 }
198 _ => {
199 let (part, mut body) = resp.into_parts();
200 let buf = body.to_buffer().await?;
201 Err(parse_error(Response::from_parts(part, buf)))
202 }
203 }
204 }
205
206 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
207 let l = DirStream::new(self.core.clone(), path);
208 Ok((RpList::default(), oio::PageLister::new(l)))
209 }
210}
211
212pub struct DirStream {
213 core: Arc<IpfsCore>,
214 path: String,
215}
216
217impl DirStream {
218 fn new(core: Arc<IpfsCore>, path: &str) -> Self {
219 Self {
220 core,
221 path: path.to_string(),
222 }
223 }
224}
225
226impl oio::PageList for DirStream {
227 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
228 let resp = self.core.ipfs_list(&self.path).await?;
229
230 if resp.status() != StatusCode::OK {
231 return Err(parse_error(resp));
232 }
233
234 let bs = resp.into_body();
235 let pb_node = PBNode::decode(bs).map_err(|e| {
236 Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
237 })?;
238
239 let names = pb_node
240 .links
241 .into_iter()
242 .map(|v| v.name.unwrap())
243 .collect::<Vec<String>>();
244
245 for mut name in names {
246 let meta = self.core.ipfs_stat(&name).await?;
247
248 if meta.mode().is_dir() {
249 name += "/";
250 }
251
252 ctx.entries.push_back(oio::Entry::new(&name, meta))
253 }
254
255 ctx.done = true;
256 Ok(())
257 }
258}