opendal/services/yandex_disk/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use super::core::parse_info;
23use super::core::MetainformationResponse;
24use super::core::YandexDiskCore;
25use super::error::parse_error;
26use crate::raw::oio::Entry;
27use crate::raw::*;
28use crate::Result;
29
30pub struct YandexDiskLister {
31    core: Arc<YandexDiskCore>,
32
33    path: String,
34    limit: Option<usize>,
35}
36
37impl YandexDiskLister {
38    pub(super) fn new(core: Arc<YandexDiskCore>, path: &str, limit: Option<usize>) -> Self {
39        YandexDiskLister {
40            core,
41            path: path.to_string(),
42            limit,
43        }
44    }
45}
46
47impl oio::PageList for YandexDiskLister {
48    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
49        let offset = if ctx.token.is_empty() {
50            None
51        } else {
52            Some(ctx.token.clone())
53        };
54
55        let resp = self
56            .core
57            .metainformation(&self.path, self.limit, offset)
58            .await?;
59
60        if resp.status() == http::StatusCode::NOT_FOUND {
61            ctx.done = true;
62            return Ok(());
63        }
64
65        match resp.status() {
66            http::StatusCode::OK => {
67                let body = resp.into_body();
68
69                let resp: MetainformationResponse =
70                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
71
72                if let Some(embedded) = resp.embedded {
73                    let n = embedded.items.len();
74
75                    for mf in embedded.items {
76                        let path = mf.path.strip_prefix("disk:");
77
78                        if let Some(path) = path {
79                            let mut path = build_rel_path(&self.core.root, path);
80
81                            let md = parse_info(mf)?;
82
83                            if md.mode().is_dir() {
84                                path = format!("{}/", path);
85                            }
86
87                            ctx.entries.push_back(Entry::new(&path, md));
88                        };
89                    }
90
91                    let current_len = ctx.token.parse::<usize>().unwrap_or(0) + n;
92
93                    if current_len >= embedded.total {
94                        ctx.done = true;
95                    }
96
97                    ctx.token = current_len.to_string();
98
99                    return Ok(());
100                }
101            }
102            http::StatusCode::NOT_FOUND => {
103                ctx.done = true;
104                return Ok(());
105            }
106            _ => {
107                return Err(parse_error(resp));
108            }
109        }
110
111        Ok(())
112    }
113}