1use std::pin::Pin;
19use std::task::ready;
20use std::task::Context;
21use std::task::Poll;
22
23use futures::Stream;
24
25use crate::raw::*;
26use crate::*;
27
28pub struct Lister {
34 lister: Option<oio::Lister>,
35
36 fut: Option<BoxedStaticFuture<(oio::Lister, Result<Option<oio::Entry>>)>>,
37 errored: bool,
38}
39
40unsafe impl Sync for Lister {}
44
45impl Lister {
46 pub(crate) async fn create(acc: Accessor, path: &str, args: OpList) -> Result<Self> {
48 let (_, lister) = acc.list(path, args).await?;
49
50 Ok(Self {
51 lister: Some(lister),
52
53 fut: None,
54 errored: false,
55 })
56 }
57}
58
59impl Stream for Lister {
60 type Item = Result<Entry>;
61
62 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 if self.errored {
65 return Poll::Ready(None);
66 }
67
68 if let Some(mut lister) = self.lister.take() {
69 let fut = async move {
70 let res = lister.next_dyn().await;
71 (lister, res)
72 };
73 self.fut = Some(Box::pin(fut));
74 }
75
76 if let Some(fut) = self.fut.as_mut() {
77 let (lister, entry) = ready!(fut.as_mut().poll(cx));
78 self.lister = Some(lister);
79 self.fut = None;
80
81 return match entry {
82 Ok(Some(oe)) => Poll::Ready(Some(Ok(oe.into_entry()))),
83 Ok(None) => {
84 self.lister = None;
85 Poll::Ready(None)
86 }
87 Err(err) => {
88 self.errored = true;
89 Poll::Ready(Some(Err(err)))
90 }
91 };
92 }
93
94 Poll::Ready(None)
95 }
96}
97
98#[cfg(test)]
99#[cfg(feature = "services-azblob")]
100mod tests {
101 use futures::future;
102 use futures::StreamExt;
103
104 use super::*;
105 use crate::services::Azblob;
106
107 #[tokio::test]
111 async fn test_invalid_lister() -> Result<()> {
112 let _ = tracing_subscriber::fmt().try_init();
113
114 let builder = Azblob::default()
115 .container("container")
116 .account_name("account_name")
117 .account_key("account_key")
118 .endpoint("https://account_name.blob.core.windows.net");
119
120 let operator = Operator::new(builder)?.finish();
121
122 let lister = operator.lister("/").await?;
123
124 lister
125 .filter_map(|entry| {
126 dbg!(&entry);
127 future::ready(entry.ok())
128 })
129 .for_each(|entry| {
130 println!("{:?}", entry);
131 future::ready(())
132 })
133 .await;
134
135 Ok(())
136 }
137}