opendal/services/dbfs/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use serde::Deserialize;
23
24use super::core::DbfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub struct DbfsLister {
30 core: Arc<DbfsCore>,
31 path: String,
32}
33
34impl DbfsLister {
35 pub fn new(core: Arc<DbfsCore>, path: String) -> Self {
36 Self { core, path }
37 }
38}
39
40impl oio::PageList for DbfsLister {
41 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
42 let response = self.core.dbfs_list(&self.path).await?;
43
44 let status_code = response.status();
45 if !status_code.is_success() {
46 if status_code == StatusCode::NOT_FOUND {
47 ctx.done = true;
48 return Ok(());
49 }
50 let error = parse_error(response);
51 return Err(error);
52 }
53
54 let bytes = response.into_body();
55 let decoded_response: DbfsOutputList =
56 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
57
58 ctx.done = true;
59
60 for status in decoded_response.files {
61 let entry: oio::Entry = match status.is_dir {
62 true => {
63 let normalized_path = format!("{}/", &status.path);
64 let mut meta = Metadata::new(EntryMode::DIR);
65 meta.set_last_modified(Timestamp::from_millisecond(status.modification_time)?);
66 oio::Entry::new(&normalized_path, meta)
67 }
68 false => {
69 let mut meta = Metadata::new(EntryMode::FILE);
70 meta.set_last_modified(Timestamp::from_millisecond(status.modification_time)?);
71 meta.set_content_length(status.file_size as u64);
72 oio::Entry::new(&status.path, meta)
73 }
74 };
75 ctx.entries.push_back(entry);
76 }
77 Ok(())
78 }
79}
80
81#[derive(Debug, Deserialize)]
82struct DbfsOutputList {
83 files: Vec<DbfsStatus>,
84}
85
86#[derive(Debug, Deserialize)]
87struct DbfsStatus {
88 path: String,
89 is_dir: bool,
90 file_size: i64,
91 modification_time: i64,
92}