opendal/services/ipmfs/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use serde::Deserialize;
23
24use super::backend::IpmfsBackend;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::EntryMode;
28use crate::Metadata;
29use crate::Result;
30
31pub struct IpmfsLister {
32    backend: Arc<IpmfsBackend>,
33    root: String,
34    path: String,
35}
36
37impl IpmfsLister {
38    pub fn new(backend: Arc<IpmfsBackend>, root: &str, path: &str) -> Self {
39        Self {
40            backend,
41            root: root.to_string(),
42            path: path.to_string(),
43        }
44    }
45}
46
47impl oio::PageList for IpmfsLister {
48    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
49        let resp = self.backend.ipmfs_ls(&self.path).await?;
50
51        if resp.status() != StatusCode::OK {
52            return Err(parse_error(resp));
53        }
54
55        let bs = resp.into_body();
56        let entries_body: IpfsLsResponse =
57            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
58
59        // Mark dir stream has been consumed.
60        ctx.done = true;
61
62        for object in entries_body.entries.unwrap_or_default() {
63            let path = match object.mode() {
64                EntryMode::FILE => format!("{}{}", &self.path, object.name),
65                EntryMode::DIR => format!("{}{}/", &self.path, object.name),
66                EntryMode::Unknown => unreachable!(),
67            };
68
69            let path = build_rel_path(&self.root, &path);
70
71            ctx.entries.push_back(oio::Entry::new(
72                &path,
73                Metadata::new(object.mode()).with_content_length(object.size),
74            ));
75        }
76
77        Ok(())
78    }
79}
80
81#[derive(Deserialize, Default, Debug)]
82#[serde(default)]
83struct IpfsLsResponseEntry {
84    #[serde(rename = "Name")]
85    name: String,
86    #[serde(rename = "Type")]
87    file_type: i64,
88    #[serde(rename = "Size")]
89    size: u64,
90}
91
92impl IpfsLsResponseEntry {
93    /// ref: <https://github.com/ipfs/specs/blob/main/UNIXFS.md#data-format>
94    ///
95    /// ```protobuf
96    /// enum DataType {
97    ///     Raw = 0;
98    ///     Directory = 1;
99    ///     File = 2;
100    ///     Metadata = 3;
101    ///     Symlink = 4;
102    ///     HAMTShard = 5;
103    /// }
104    /// ```
105    fn mode(&self) -> EntryMode {
106        match &self.file_type {
107            1 => EntryMode::DIR,
108            0 | 2 => EntryMode::FILE,
109            _ => EntryMode::Unknown,
110        }
111    }
112}
113
114#[derive(Deserialize, Default, Debug)]
115#[serde(default)]
116struct IpfsLsResponse {
117    #[serde(rename = "Entries")]
118    entries: Option<Vec<IpfsLsResponseEntry>>,
119}