opendal/services/dbfs/
lister.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use serde::Deserialize;
23
24use super::core::DbfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub struct DbfsLister {
30 core: Arc<DbfsCore>,
31 path: String,
32}
33
34impl DbfsLister {
35 pub fn new(core: Arc<DbfsCore>, path: String) -> Self {
36 Self { core, path }
37 }
38}
39
40impl oio::PageList for DbfsLister {
41 async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
42 let response = self.core.dbfs_list(&self.path).await?;
43
44 let status_code = response.status();
45 if !status_code.is_success() {
46 if status_code == StatusCode::NOT_FOUND {
47 ctx.done = true;
48 return Ok(());
49 }
50 let error = parse_error(response);
51 return Err(error);
52 }
53
54 let bytes = response.into_body();
55 let decoded_response: DbfsOutputList =
56 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
57
58 ctx.done = true;
59
60 for status in decoded_response.files {
61 let entry: oio::Entry = match status.is_dir {
62 true => {
63 let normalized_path = format!("{}/", &status.path);
64 let mut meta = Metadata::new(EntryMode::DIR);
65 meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
66 status.modification_time,
67 )?);
68 oio::Entry::new(&normalized_path, meta)
69 }
70 false => {
71 let mut meta = Metadata::new(EntryMode::FILE);
72 meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
73 status.modification_time,
74 )?);
75 meta.set_content_length(status.file_size as u64);
76 oio::Entry::new(&status.path, meta)
77 }
78 };
79 ctx.entries.push_back(entry);
80 }
81 Ok(())
82 }
83}
84
85#[derive(Debug, Deserialize)]
86struct DbfsOutputList {
87 files: Vec<DbfsStatus>,
88}
89
90#[derive(Debug, Deserialize)]
91struct DbfsStatus {
92 path: String,
93 is_dir: bool,
94 file_size: i64,
95 modification_time: i64,
96}