opendal/services/ipfs/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use prost::Message;
26
27use super::core::IpfsCore;
28use super::error::parse_error;
29use super::ipld::PBNode;
30use super::DEFAULT_SCHEME;
31use crate::raw::*;
32use crate::services::IpfsConfig;
33use crate::*;
34impl Configurator for IpfsConfig {
35 type Builder = IpfsBuilder;
36
37 #[allow(deprecated)]
38 fn into_builder(self) -> Self::Builder {
39 IpfsBuilder {
40 config: self,
41 http_client: None,
42 }
43 }
44}
45
46#[doc = include_str!("docs.md")]
48#[derive(Default, Clone, Debug)]
49pub struct IpfsBuilder {
50 config: IpfsConfig,
51
52 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
53 http_client: Option<HttpClient>,
54}
55
56impl IpfsBuilder {
57 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn endpoint(mut self, endpoint: &str) -> Self {
86 if !endpoint.is_empty() {
87 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89 }
90
91 self
92 }
93
94 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
101 #[allow(deprecated)]
102 pub fn http_client(mut self, client: HttpClient) -> Self {
103 self.http_client = Some(client);
104 self
105 }
106}
107
108impl Builder for IpfsBuilder {
109 type Config = IpfsConfig;
110
111 fn build(self) -> Result<impl Access> {
112 debug!("backend build started: {:?}", &self);
113
114 let root = normalize_root(&self.config.root.unwrap_or_default());
115 if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
116 return Err(Error::new(
117 ErrorKind::ConfigInvalid,
118 "root must start with /ipfs/ or /ipns/",
119 )
120 .with_context("service", Scheme::Ipfs)
121 .with_context("root", &root));
122 }
123 debug!("backend use root {root}");
124
125 let endpoint = match &self.config.endpoint {
126 Some(endpoint) => Ok(endpoint.clone()),
127 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
128 .with_context("service", Scheme::Ipfs)
129 .with_context("root", &root)),
130 }?;
131 debug!("backend use endpoint {}", &endpoint);
132
133 let info = AccessorInfo::default();
134 info.set_scheme(DEFAULT_SCHEME)
135 .set_root(&root)
136 .set_native_capability(Capability {
137 stat: true,
138
139 read: true,
140
141 list: true,
142
143 shared: true,
144
145 ..Default::default()
146 });
147
148 let accessor_info = Arc::new(info);
149 let core = Arc::new(IpfsCore {
150 info: accessor_info,
151 root,
152 endpoint,
153 });
154
155 Ok(IpfsBackend { core })
156 }
157}
158
159#[derive(Clone)]
161pub struct IpfsBackend {
162 core: Arc<IpfsCore>,
163}
164
165impl Debug for IpfsBackend {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 f.debug_struct("IpfsBackend")
168 .field("core", &self.core)
169 .finish()
170 }
171}
172
173impl Access for IpfsBackend {
174 type Reader = HttpBody;
175 type Writer = ();
176 type Lister = oio::PageLister<DirStream>;
177 type Deleter = ();
178
179 fn info(&self) -> Arc<AccessorInfo> {
180 self.core.info.clone()
181 }
182
183 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
184 let metadata = self.core.ipfs_stat(path).await?;
185 Ok(RpStat::new(metadata))
186 }
187
188 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
189 let resp = self.core.ipfs_get(path, args.range()).await?;
190
191 let status = resp.status();
192
193 match status {
194 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
195 Ok((RpRead::default(), resp.into_body()))
196 }
197 _ => {
198 let (part, mut body) = resp.into_parts();
199 let buf = body.to_buffer().await?;
200 Err(parse_error(Response::from_parts(part, buf)))
201 }
202 }
203 }
204
205 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
206 let l = DirStream::new(self.core.clone(), path);
207 Ok((RpList::default(), oio::PageLister::new(l)))
208 }
209}
210
211pub struct DirStream {
212 core: Arc<IpfsCore>,
213 path: String,
214}
215
216impl DirStream {
217 fn new(core: Arc<IpfsCore>, path: &str) -> Self {
218 Self {
219 core,
220 path: path.to_string(),
221 }
222 }
223}
224
225impl oio::PageList for DirStream {
226 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
227 let resp = self.core.ipfs_list(&self.path).await?;
228
229 if resp.status() != StatusCode::OK {
230 return Err(parse_error(resp));
231 }
232
233 let bs = resp.into_body();
234 let pb_node = PBNode::decode(bs).map_err(|e| {
235 Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
236 })?;
237
238 let names = pb_node
239 .links
240 .into_iter()
241 .map(|v| v.name.unwrap())
242 .collect::<Vec<String>>();
243
244 for mut name in names {
245 let meta = self.core.ipfs_stat(&name).await?;
246
247 if meta.mode().is_dir() {
248 name += "/";
249 }
250
251 ctx.entries.push_back(oio::Entry::new(&name, meta))
252 }
253
254 ctx.done = true;
255 Ok(())
256 }
257}