opendal/services/ipmfs/
backend.rs1use std::fmt;
19use std::str;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::core::IpmfsCore;
28use super::delete::IpmfsDeleter;
29use super::error::parse_error;
30use super::lister::IpmfsLister;
31use super::writer::IpmfsWriter;
32use crate::raw::*;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
37#[derive(Clone)]
38pub struct IpmfsBackend {
39 pub core: Arc<IpmfsCore>,
40}
41
42impl fmt::Debug for IpmfsBackend {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 f.debug_struct("IpmfsBackend")
45 .field("core", &self.core)
46 .finish()
47 }
48}
49
50impl Access for IpmfsBackend {
51 type Reader = HttpBody;
52 type Writer = oio::OneShotWriter<IpmfsWriter>;
53 type Lister = oio::PageLister<IpmfsLister>;
54 type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
55 type BlockingReader = ();
56 type BlockingWriter = ();
57 type BlockingLister = ();
58 type BlockingDeleter = ();
59
60 fn info(&self) -> Arc<AccessorInfo> {
61 self.core.info.clone()
62 }
63
64 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
65 let resp = self.core.ipmfs_mkdir(path).await?;
66
67 let status = resp.status();
68
69 match status {
70 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
71 _ => Err(parse_error(resp)),
72 }
73 }
74
75 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
76 if path == "/" {
78 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
79 }
80
81 let resp = self.core.ipmfs_stat(path).await?;
82
83 let status = resp.status();
84
85 match status {
86 StatusCode::OK => {
87 let bs = resp.into_body();
88
89 let res: IpfsStatResponse =
90 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
91
92 let mode = match res.file_type.as_str() {
93 "file" => EntryMode::FILE,
94 "directory" => EntryMode::DIR,
95 _ => EntryMode::Unknown,
96 };
97
98 let mut meta = Metadata::new(mode);
99 meta.set_content_length(res.size);
100
101 Ok(RpStat::new(meta))
102 }
103 _ => Err(parse_error(resp)),
104 }
105 }
106
107 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
108 let resp = self.core.ipmfs_read(path, args.range()).await?;
109
110 let status = resp.status();
111
112 match status {
113 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
114 Ok((RpRead::default(), resp.into_body()))
115 }
116 _ => {
117 let (part, mut body) = resp.into_parts();
118 let buf = body.to_buffer().await?;
119 Err(parse_error(Response::from_parts(part, buf)))
120 }
121 }
122 }
123
124 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
125 Ok((
126 RpWrite::default(),
127 oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(), path.to_string())),
128 ))
129 }
130
131 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
132 Ok((
133 RpDelete::default(),
134 oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
135 ))
136 }
137
138 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
139 let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
140 Ok((RpList::default(), oio::PageLister::new(l)))
141 }
142}
143
144#[derive(Deserialize, Default, Debug)]
145#[serde(default)]
146struct IpfsStatResponse {
147 #[serde(rename = "Size")]
148 size: u64,
149 #[serde(rename = "Type")]
150 file_type: String,
151}