opendal/services/ipmfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::Response;
22use http::StatusCode;
23use serde::Deserialize;
24
25use super::core::IpmfsCore;
26use super::deleter::IpmfsDeleter;
27use super::error::parse_error;
28use super::lister::IpmfsLister;
29use super::writer::IpmfsWriter;
30use crate::raw::*;
31use crate::*;
32
33/// IPFS Mutable File System (IPMFS) backend.
34#[doc = include_str!("docs.md")]
35#[derive(Clone, Debug)]
36pub struct IpmfsBackend {
37    pub core: Arc<IpmfsCore>,
38}
39
40impl Access for IpmfsBackend {
41    type Reader = HttpBody;
42    type Writer = oio::OneShotWriter<IpmfsWriter>;
43    type Lister = oio::PageLister<IpmfsLister>;
44    type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
45
46    fn info(&self) -> Arc<AccessorInfo> {
47        self.core.info.clone()
48    }
49
50    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
51        let resp = self.core.ipmfs_mkdir(path).await?;
52
53        let status = resp.status();
54
55        match status {
56            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
57            _ => Err(parse_error(resp)),
58        }
59    }
60
61    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
62        // Stat root always returns a DIR.
63        if path == "/" {
64            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
65        }
66
67        let resp = self.core.ipmfs_stat(path).await?;
68
69        let status = resp.status();
70
71        match status {
72            StatusCode::OK => {
73                let bs = resp.into_body();
74
75                let res: IpfsStatResponse =
76                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
77
78                let mode = match res.file_type.as_str() {
79                    "file" => EntryMode::FILE,
80                    "directory" => EntryMode::DIR,
81                    _ => EntryMode::Unknown,
82                };
83
84                let mut meta = Metadata::new(mode);
85                meta.set_content_length(res.size);
86
87                Ok(RpStat::new(meta))
88            }
89            _ => Err(parse_error(resp)),
90        }
91    }
92
93    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94        let resp = self.core.ipmfs_read(path, args.range()).await?;
95
96        let status = resp.status();
97
98        match status {
99            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
100                Ok((RpRead::default(), resp.into_body()))
101            }
102            _ => {
103                let (part, mut body) = resp.into_parts();
104                let buf = body.to_buffer().await?;
105                Err(parse_error(Response::from_parts(part, buf)))
106            }
107        }
108    }
109
110    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
111        Ok((
112            RpWrite::default(),
113            oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(), path.to_string())),
114        ))
115    }
116
117    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
118        Ok((
119            RpDelete::default(),
120            oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
121        ))
122    }
123
124    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
125        let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
126        Ok((RpList::default(), oio::PageLister::new(l)))
127    }
128}
129
130#[derive(Deserialize, Default, Debug)]
131#[serde(default)]
132struct IpfsStatResponse {
133    #[serde(rename = "Size")]
134    size: u64,
135    #[serde(rename = "Type")]
136    file_type: String,
137}