opendal/services/gcs/
lister.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use serde_json;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::*;
26use crate::*;
27
28/// GcsLister takes over task of listing objects and
29/// helps walking directory
30pub struct GcsLister {
31    core: Arc<GcsCore>,
32
33    path: String,
34    delimiter: &'static str,
35    limit: Option<usize>,
36
37    /// Filter results to objects whose names are lexicographically
38    /// **equal to or after** startOffset
39    start_after: Option<String>,
40}
41
42impl GcsLister {
43    /// Generate a new directory walker
44    pub fn new(
45        core: Arc<GcsCore>,
46        path: &str,
47        recursive: bool,
48        limit: Option<usize>,
49        start_after: Option<&str>,
50    ) -> Self {
51        let delimiter = if recursive { "" } else { "/" };
52        Self {
53            core,
54
55            path: path.to_string(),
56            delimiter,
57            limit,
58            start_after: start_after.map(String::from),
59        }
60    }
61}
62
63impl oio::PageList for GcsLister {
64    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
65        let resp = self
66            .core
67            .gcs_list_objects(
68                &self.path,
69                &ctx.token,
70                self.delimiter,
71                self.limit,
72                if ctx.token.is_empty() {
73                    self.start_after.clone()
74                } else {
75                    None
76                },
77            )
78            .await?;
79
80        if !resp.status().is_success() {
81            return Err(parse_error(resp));
82        }
83        let bytes = resp.into_body();
84
85        let output: ListResponse =
86            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
87
88        if let Some(token) = &output.next_page_token {
89            ctx.token.clone_from(token);
90        } else {
91            ctx.done = true;
92        }
93
94        for prefix in output.prefixes {
95            let de = oio::Entry::new(
96                &build_rel_path(&self.core.root, &prefix),
97                Metadata::new(EntryMode::DIR),
98            );
99
100            ctx.entries.push_back(de);
101        }
102
103        for object in output.items {
104            // exclude the inclusive start_after itself
105            let mut path = build_rel_path(&self.core.root, &object.name);
106            if path.is_empty() {
107                path = "/".to_string();
108            }
109            if self.start_after.as_ref() == Some(&path) {
110                continue;
111            }
112
113            let mut meta = Metadata::new(EntryMode::from_path(&path));
114
115            // set metadata fields
116            meta.set_content_md5(object.md5_hash.as_str());
117            meta.set_etag(object.etag.as_str());
118
119            let size = object.size.parse().map_err(|e| {
120                Error::new(ErrorKind::Unexpected, "parse u64 from list response").set_source(e)
121            })?;
122            meta.set_content_length(size);
123            if !object.content_type.is_empty() {
124                meta.set_content_type(&object.content_type);
125            }
126
127            meta.set_last_modified(parse_datetime_from_rfc3339(object.updated.as_str())?);
128
129            let de = oio::Entry::with(path, meta);
130
131            ctx.entries.push_back(de);
132        }
133
134        Ok(())
135    }
136}