1use std::pin::Pin;
19use std::task::ready;
20use std::task::Context;
21use std::task::Poll;
22
23use futures::Stream;
24
25use crate::raw::*;
26use crate::*;
27
28pub struct Lister {
34 lister: Option<oio::Lister>,
35
36 fut: Option<BoxedStaticFuture<(oio::Lister, Result<Option<oio::Entry>>)>>,
37 errored: bool,
38}
39
40unsafe impl Sync for Lister {}
44
45impl Lister {
46 pub(crate) async fn create(acc: Accessor, path: &str, args: OpList) -> Result<Self> {
48 let (_, lister) = acc.list(path, args).await?;
49
50 Ok(Self {
51 lister: Some(lister),
52
53 fut: None,
54 errored: false,
55 })
56 }
57}
58
59impl Stream for Lister {
60 type Item = Result<Entry>;
61
62 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 if self.errored {
65 return Poll::Ready(None);
66 }
67
68 if let Some(mut lister) = self.lister.take() {
69 let fut = async move {
70 let res = lister.next_dyn().await;
71 (lister, res)
72 };
73 self.fut = Some(Box::pin(fut));
74 }
75
76 if let Some(fut) = self.fut.as_mut() {
77 let (lister, entry) = ready!(fut.as_mut().poll(cx));
78 self.lister = Some(lister);
79 self.fut = None;
80
81 return match entry {
82 Ok(Some(oe)) => Poll::Ready(Some(Ok(oe.into_entry()))),
83 Ok(None) => {
84 self.lister = None;
85 Poll::Ready(None)
86 }
87 Err(err) => {
88 self.errored = true;
89 Poll::Ready(Some(Err(err)))
90 }
91 };
92 }
93
94 Poll::Ready(None)
95 }
96}
97
98pub struct BlockingLister {
106 lister: oio::BlockingLister,
107 errored: bool,
108}
109
110unsafe impl Sync for BlockingLister {}
114
115impl BlockingLister {
116 pub(crate) fn create(acc: Accessor, path: &str, args: OpList) -> Result<Self> {
118 let (_, lister) = acc.blocking_list(path, args)?;
119
120 Ok(Self {
121 lister,
122 errored: false,
123 })
124 }
125}
126
127impl Iterator for BlockingLister {
128 type Item = Result<Entry>;
129
130 fn next(&mut self) -> Option<Self::Item> {
131 if self.errored {
133 return None;
134 }
135
136 match self.lister.next() {
137 Ok(Some(entry)) => Some(Ok(entry.into_entry())),
138 Ok(None) => None,
139 Err(err) => {
140 self.errored = true;
141 Some(Err(err))
142 }
143 }
144 }
145}
146
147#[cfg(test)]
148#[cfg(feature = "services-azblob")]
149mod tests {
150 use futures::future;
151 use futures::StreamExt;
152
153 use super::*;
154 use crate::services::Azblob;
155
156 #[tokio::test]
160 async fn test_invalid_lister() -> Result<()> {
161 let _ = tracing_subscriber::fmt().try_init();
162
163 let builder = Azblob::default()
164 .container("container")
165 .account_name("account_name")
166 .account_key("account_key")
167 .endpoint("https://account_name.blob.core.windows.net");
168
169 let operator = Operator::new(builder)?.finish();
170
171 let lister = operator.lister("/").await?;
172
173 lister
174 .filter_map(|entry| {
175 dbg!(&entry);
176 future::ready(entry.ok())
177 })
178 .for_each(|entry| {
179 println!("{:?}", entry);
180 future::ready(())
181 })
182 .await;
183
184 Ok(())
185 }
186}