opendal/services/ipmfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::str;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::core::IpmfsCore;
28use super::delete::IpmfsDeleter;
29use super::error::parse_error;
30use super::lister::IpmfsLister;
31use super::writer::IpmfsWriter;
32use crate::raw::*;
33use crate::*;
34
35/// IPFS Mutable File System (IPMFS) backend.
36#[doc = include_str!("docs.md")]
37#[derive(Clone)]
38pub struct IpmfsBackend {
39    pub core: Arc<IpmfsCore>,
40}
41
42impl fmt::Debug for IpmfsBackend {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        f.debug_struct("IpmfsBackend")
45            .field("core", &self.core)
46            .finish()
47    }
48}
49
50impl Access for IpmfsBackend {
51    type Reader = HttpBody;
52    type Writer = oio::OneShotWriter<IpmfsWriter>;
53    type Lister = oio::PageLister<IpmfsLister>;
54    type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
55    type BlockingReader = ();
56    type BlockingWriter = ();
57    type BlockingLister = ();
58    type BlockingDeleter = ();
59
60    fn info(&self) -> Arc<AccessorInfo> {
61        self.core.info.clone()
62    }
63
64    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
65        let resp = self.core.ipmfs_mkdir(path).await?;
66
67        let status = resp.status();
68
69        match status {
70            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
71            _ => Err(parse_error(resp)),
72        }
73    }
74
75    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
76        // Stat root always returns a DIR.
77        if path == "/" {
78            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
79        }
80
81        let resp = self.core.ipmfs_stat(path).await?;
82
83        let status = resp.status();
84
85        match status {
86            StatusCode::OK => {
87                let bs = resp.into_body();
88
89                let res: IpfsStatResponse =
90                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
91
92                let mode = match res.file_type.as_str() {
93                    "file" => EntryMode::FILE,
94                    "directory" => EntryMode::DIR,
95                    _ => EntryMode::Unknown,
96                };
97
98                let mut meta = Metadata::new(mode);
99                meta.set_content_length(res.size);
100
101                Ok(RpStat::new(meta))
102            }
103            _ => Err(parse_error(resp)),
104        }
105    }
106
107    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
108        let resp = self.core.ipmfs_read(path, args.range()).await?;
109
110        let status = resp.status();
111
112        match status {
113            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
114                Ok((RpRead::default(), resp.into_body()))
115            }
116            _ => {
117                let (part, mut body) = resp.into_parts();
118                let buf = body.to_buffer().await?;
119                Err(parse_error(Response::from_parts(part, buf)))
120            }
121        }
122    }
123
124    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
125        Ok((
126            RpWrite::default(),
127            oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(), path.to_string())),
128        ))
129    }
130
131    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
132        Ok((
133            RpDelete::default(),
134            oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
135        ))
136    }
137
138    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
139        let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
140        Ok((RpList::default(), oio::PageLister::new(l)))
141    }
142}
143
144#[derive(Deserialize, Default, Debug)]
145#[serde(default)]
146struct IpfsStatResponse {
147    #[serde(rename = "Size")]
148    size: u64,
149    #[serde(rename = "Type")]
150    file_type: String,
151}