opendal/services/ipmfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::str;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::Response;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::core::IpmfsCore;
28use super::delete::IpmfsDeleter;
29use super::error::parse_error;
30use super::lister::IpmfsLister;
31use super::writer::IpmfsWriter;
32use crate::raw::*;
33use crate::*;
34
35/// IPFS Mutable File System (IPMFS) backend.
36#[doc = include_str!("docs.md")]
37#[derive(Clone)]
38pub struct IpmfsBackend {
39    pub core: Arc<IpmfsCore>,
40}
41
42impl fmt::Debug for IpmfsBackend {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        f.debug_struct("IpmfsBackend")
45            .field("core", &self.core)
46            .finish()
47    }
48}
49
50impl Access for IpmfsBackend {
51    type Reader = HttpBody;
52    type Writer = oio::OneShotWriter<IpmfsWriter>;
53    type Lister = oio::PageLister<IpmfsLister>;
54    type Deleter = oio::OneShotDeleter<IpmfsDeleter>;
55
56    fn info(&self) -> Arc<AccessorInfo> {
57        self.core.info.clone()
58    }
59
60    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
61        let resp = self.core.ipmfs_mkdir(path).await?;
62
63        let status = resp.status();
64
65        match status {
66            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
67            _ => Err(parse_error(resp)),
68        }
69    }
70
71    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
72        // Stat root always returns a DIR.
73        if path == "/" {
74            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
75        }
76
77        let resp = self.core.ipmfs_stat(path).await?;
78
79        let status = resp.status();
80
81        match status {
82            StatusCode::OK => {
83                let bs = resp.into_body();
84
85                let res: IpfsStatResponse =
86                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
87
88                let mode = match res.file_type.as_str() {
89                    "file" => EntryMode::FILE,
90                    "directory" => EntryMode::DIR,
91                    _ => EntryMode::Unknown,
92                };
93
94                let mut meta = Metadata::new(mode);
95                meta.set_content_length(res.size);
96
97                Ok(RpStat::new(meta))
98            }
99            _ => Err(parse_error(resp)),
100        }
101    }
102
103    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
104        let resp = self.core.ipmfs_read(path, args.range()).await?;
105
106        let status = resp.status();
107
108        match status {
109            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
110                Ok((RpRead::default(), resp.into_body()))
111            }
112            _ => {
113                let (part, mut body) = resp.into_parts();
114                let buf = body.to_buffer().await?;
115                Err(parse_error(Response::from_parts(part, buf)))
116            }
117        }
118    }
119
120    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
121        Ok((
122            RpWrite::default(),
123            oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(), path.to_string())),
124        ))
125    }
126
127    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
128        Ok((
129            RpDelete::default(),
130            oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
131        ))
132    }
133
134    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
135        let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
136        Ok((RpList::default(), oio::PageLister::new(l)))
137    }
138}
139
140#[derive(Deserialize, Default, Debug)]
141#[serde(default)]
142struct IpfsStatResponse {
143    #[serde(rename = "Size")]
144    size: u64,
145    #[serde(rename = "Type")]
146    file_type: String,
147}