opendal/services/ipmfs/
backend.rs1use std::fmt;
19use std::str;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::core::IpmfsCore;
28use super::delete::IpmfsDeleter;
29use super::error::parse_error;
30use super::lister::IpmfsLister;
31use super::writer::IpmfsWriter;
32use crate::raw::*;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
37#[derive(Clone)]
38pub struct IpmfsBackend {
39 pub core: Arc<IpmfsCore>,
40}
41
42impl fmt::Debug for IpmfsBackend {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 f.debug_struct("IpmfsBackend")
45 .field("core", &self.core)
46 .finish()
47 }
48}
49
50impl Access for IpmfsBackend {
51 type Reader = HttpBody;
52 type Writer = oio::OneShotWriter<IpmfsWriter>;
53 type Lister = oio::PageLister<IpmfsLister>;
54 type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
55
56 fn info(&self) -> Arc<AccessorInfo> {
57 self.core.info.clone()
58 }
59
60 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
61 let resp = self.core.ipmfs_mkdir(path).await?;
62
63 let status = resp.status();
64
65 match status {
66 StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
67 _ => Err(parse_error(resp)),
68 }
69 }
70
71 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
72 if path == "/" {
74 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
75 }
76
77 let resp = self.core.ipmfs_stat(path).await?;
78
79 let status = resp.status();
80
81 match status {
82 StatusCode::OK => {
83 let bs = resp.into_body();
84
85 let res: IpfsStatResponse =
86 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
87
88 let mode = match res.file_type.as_str() {
89 "file" => EntryMode::FILE,
90 "directory" => EntryMode::DIR,
91 _ => EntryMode::Unknown,
92 };
93
94 let mut meta = Metadata::new(mode);
95 meta.set_content_length(res.size);
96
97 Ok(RpStat::new(meta))
98 }
99 _ => Err(parse_error(resp)),
100 }
101 }
102
103 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
104 let resp = self.core.ipmfs_read(path, args.range()).await?;
105
106 let status = resp.status();
107
108 match status {
109 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
110 Ok((RpRead::default(), resp.into_body()))
111 }
112 _ => {
113 let (part, mut body) = resp.into_parts();
114 let buf = body.to_buffer().await?;
115 Err(parse_error(Response::from_parts(part, buf)))
116 }
117 }
118 }
119
120 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
121 Ok((
122 RpWrite::default(),
123 oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(), path.to_string())),
124 ))
125 }
126
127 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
128 Ok((
129 RpDelete::default(),
130 oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
131 ))
132 }
133
134 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
135 let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
136 Ok((RpList::default(), oio::PageLister::new(l)))
137 }
138}
139
140#[derive(Deserialize, Default, Debug)]
141#[serde(default)]
142struct IpfsStatResponse {
143 #[serde(rename = "Size")]
144 size: u64,
145 #[serde(rename = "Type")]
146 file_type: String,
147}