opendal/services/ipmfs/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use serde::Deserialize;
23
24use super::core::IpmfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::EntryMode;
28use crate::ErrorKind;
29use crate::Metadata;
30use crate::Result;
31
32pub struct IpmfsLister {
33    core: Arc<IpmfsCore>,
34    root: String,
35    path: String,
36}
37
38impl IpmfsLister {
39    pub fn new(core: Arc<IpmfsCore>, root: &str, path: &str) -> Self {
40        Self {
41            core,
42            root: root.to_string(),
43            path: path.to_string(),
44        }
45    }
46}
47
48impl oio::PageList for IpmfsLister {
49    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
50        let resp = self.core.ipmfs_ls(&self.path).await?;
51
52        if resp.status() != StatusCode::OK {
53            let err = parse_error(resp);
54            if matches!(err.kind(), ErrorKind::NotFound) {
55                // treat as empty listing
56                ctx.done = true;
57                return Ok(());
58            }
59            return Err(err);
60        }
61
62        // Add current directory entry when processing the first page
63        if ctx.token.is_empty() && !ctx.done {
64            let path = build_abs_path(&self.root, self.path.as_str());
65            let path = build_rel_path(&self.root, &path);
66
67            ctx.entries
68                .push_back(oio::Entry::new(&path, Metadata::new(EntryMode::DIR)));
69        }
70
71        let bs = resp.into_body();
72        let entries_body: IpfsLsResponse =
73            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
74
75        // Mark dir stream has been consumed.
76        ctx.done = true;
77
78        for object in entries_body.entries.unwrap_or_default() {
79            let path = match object.mode() {
80                EntryMode::FILE => format!("{}{}", &self.path, object.name),
81                EntryMode::DIR => format!("{}{}/", &self.path, object.name),
82                EntryMode::Unknown => unreachable!(),
83            };
84
85            let path = build_rel_path(&self.root, &path);
86
87            ctx.entries.push_back(oio::Entry::new(
88                &path,
89                Metadata::new(object.mode()).with_content_length(object.size),
90            ));
91        }
92
93        Ok(())
94    }
95}
96
97#[derive(Deserialize, Default, Debug)]
98#[serde(default)]
99struct IpfsLsResponseEntry {
100    #[serde(rename = "Name")]
101    name: String,
102    #[serde(rename = "Type")]
103    file_type: i64,
104    #[serde(rename = "Size")]
105    size: u64,
106}
107
108impl IpfsLsResponseEntry {
109    /// ref: <https://github.com/ipfs/specs/blob/main/UNIXFS.md#data-format>
110    ///
111    /// ```protobuf
112    /// enum DataType {
113    ///     Raw = 0;
114    ///     Directory = 1;
115    ///     File = 2;
116    ///     Metadata = 3;
117    ///     Symlink = 4;
118    ///     HAMTShard = 5;
119    /// }
120    /// ```
121    fn mode(&self) -> EntryMode {
122        match &self.file_type {
123            1 => EntryMode::DIR,
124            0 | 2 => EntryMode::FILE,
125            _ => EntryMode::Unknown,
126        }
127    }
128}
129
130#[derive(Deserialize, Default, Debug)]
131#[serde(default)]
132struct IpfsLsResponse {
133    #[serde(rename = "Entries")]
134    entries: Option<Vec<IpfsLsResponseEntry>>,
135}