opendal/services/ipfs/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use prost::Message;
25
26use super::IPFS_SCHEME;
27use super::config::IpfsConfig;
28use super::core::IpfsCore;
29use super::error::parse_error;
30use super::ipld::PBNode;
31use crate::raw::*;
32use crate::*;
33
34#[doc = include_str!("docs.md")]
36#[derive(Default)]
37pub struct IpfsBuilder {
38 pub(super) config: IpfsConfig,
39
40 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
41 pub(super) http_client: Option<HttpClient>,
42}
43
44impl Debug for IpfsBuilder {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("IpfsBuilder")
47 .field("config", &self.config)
48 .finish()
49 }
50}
51
52impl IpfsBuilder {
53 pub fn root(mut self, root: &str) -> Self {
61 self.config.root = if root.is_empty() {
62 None
63 } else {
64 Some(root.to_string())
65 };
66
67 self
68 }
69
70 pub fn endpoint(mut self, endpoint: &str) -> Self {
82 if !endpoint.is_empty() {
83 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
85 }
86
87 self
88 }
89
90 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
97 #[allow(deprecated)]
98 pub fn http_client(mut self, client: HttpClient) -> Self {
99 self.http_client = Some(client);
100 self
101 }
102}
103
104impl Builder for IpfsBuilder {
105 type Config = IpfsConfig;
106
107 fn build(self) -> Result<impl Access> {
108 debug!("backend build started: {:?}", &self);
109
110 let root = normalize_root(&self.config.root.unwrap_or_default());
111 if !root.starts_with("/ipfs/") && !root.starts_with("/ipns/") {
112 return Err(Error::new(
113 ErrorKind::ConfigInvalid,
114 "root must start with /ipfs/ or /ipns/",
115 )
116 .with_context("service", Scheme::Ipfs)
117 .with_context("root", &root));
118 }
119 debug!("backend use root {root}");
120
121 let endpoint = match &self.config.endpoint {
122 Some(endpoint) => Ok(endpoint.clone()),
123 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
124 .with_context("service", Scheme::Ipfs)
125 .with_context("root", &root)),
126 }?;
127 debug!("backend use endpoint {}", &endpoint);
128
129 let info = AccessorInfo::default();
130 info.set_scheme(IPFS_SCHEME)
131 .set_root(&root)
132 .set_native_capability(Capability {
133 stat: true,
134
135 read: true,
136
137 list: true,
138
139 shared: true,
140
141 ..Default::default()
142 });
143
144 let accessor_info = Arc::new(info);
145 let core = Arc::new(IpfsCore {
146 info: accessor_info,
147 root,
148 endpoint,
149 });
150
151 Ok(IpfsBackend { core })
152 }
153}
154
155#[derive(Clone, Debug)]
157pub struct IpfsBackend {
158 core: Arc<IpfsCore>,
159}
160
161impl Access for IpfsBackend {
162 type Reader = HttpBody;
163 type Writer = ();
164 type Lister = oio::PageLister<DirStream>;
165 type Deleter = ();
166
167 fn info(&self) -> Arc<AccessorInfo> {
168 self.core.info.clone()
169 }
170
171 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
172 let metadata = self.core.ipfs_stat(path).await?;
173 Ok(RpStat::new(metadata))
174 }
175
176 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
177 let resp = self.core.ipfs_get(path, args.range()).await?;
178
179 let status = resp.status();
180
181 match status {
182 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
183 Ok((RpRead::default(), resp.into_body()))
184 }
185 _ => {
186 let (part, mut body) = resp.into_parts();
187 let buf = body.to_buffer().await?;
188 Err(parse_error(Response::from_parts(part, buf)))
189 }
190 }
191 }
192
193 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
194 let l = DirStream::new(self.core.clone(), path);
195 Ok((RpList::default(), oio::PageLister::new(l)))
196 }
197}
198
199pub struct DirStream {
200 core: Arc<IpfsCore>,
201 path: String,
202}
203
204impl DirStream {
205 fn new(core: Arc<IpfsCore>, path: &str) -> Self {
206 Self {
207 core,
208 path: path.to_string(),
209 }
210 }
211}
212
213impl oio::PageList for DirStream {
214 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
215 let resp = self.core.ipfs_list(&self.path).await?;
216
217 if resp.status() != StatusCode::OK {
218 return Err(parse_error(resp));
219 }
220
221 let bs = resp.into_body();
222 let pb_node = PBNode::decode(bs).map_err(|e| {
223 Error::new(ErrorKind::Unexpected, "deserialize protobuf from response").set_source(e)
224 })?;
225
226 let names = pb_node
227 .links
228 .into_iter()
229 .map(|v| v.name.unwrap())
230 .collect::<Vec<String>>();
231
232 for mut name in names {
233 let meta = self.core.ipfs_stat(&name).await?;
234
235 if meta.mode().is_dir() {
236 name += "/";
237 }
238
239 ctx.entries.push_back(oio::Entry::new(&name, meta))
240 }
241
242 ctx.done = true;
243 Ok(())
244 }
245}