opendal/types/write/
buffer_sink.rs1use std::pin::Pin;
19use std::task::ready;
20use std::task::Context;
21use std::task::Poll;
22
23use bytes::Buf;
24
25use crate::raw::*;
26use crate::*;
27
28pub struct BufferSink {
32 state: State,
33 buf: Buffer,
34}
35
36enum State {
37 Idle(Option<WriteGenerator<oio::Writer>>),
38 Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<usize>)>),
39 Closing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<Metadata>)>),
40}
41
42unsafe impl Sync for State {}
46
47impl BufferSink {
48 #[inline]
50 pub(crate) fn new(w: WriteGenerator<oio::Writer>) -> Self {
51 BufferSink {
52 state: State::Idle(Some(w)),
53 buf: Buffer::new(),
54 }
55 }
56}
57
58impl futures::Sink<Buffer> for BufferSink {
59 type Error = Error;
60
61 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
62 self.poll_flush(cx)
63 }
64
65 fn start_send(mut self: Pin<&mut Self>, item: Buffer) -> Result<()> {
66 self.buf = item;
67 Ok(())
68 }
69
70 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
71 let this = self.get_mut();
72 loop {
73 match &mut this.state {
74 State::Idle(w) => {
75 if this.buf.is_empty() {
76 return Poll::Ready(Ok(()));
77 }
78 let Some(mut w) = w.take() else {
79 return Poll::Ready(Err(Error::new(
80 ErrorKind::Unexpected,
81 "state invalid: sink has been closed",
82 )));
83 };
84 let buf = this.buf.clone();
85 let fut = async move {
86 let res = w.write(buf).await;
87 (w, res)
88 };
89 this.state = State::Writing(Box::pin(fut));
90 }
91 State::Writing(fut) => {
92 let (w, res) = ready!(fut.as_mut().poll(cx));
93 this.state = State::Idle(Some(w));
94 match res {
95 Ok(n) => {
96 this.buf.advance(n);
97 }
98 Err(err) => return Poll::Ready(Err(err)),
99 }
100 }
101 State::Closing(_) => {
102 return Poll::Ready(Err(Error::new(
103 ErrorKind::Unexpected,
104 "state invalid: sink is closing",
105 )))
106 }
107 }
108 }
109 }
110
111 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
112 let this = self.get_mut();
113 loop {
114 match &mut this.state {
115 State::Idle(w) => {
116 let Some(mut w) = w.take() else {
117 return Poll::Ready(Err(Error::new(
118 ErrorKind::Unexpected,
119 "state invalid: sink has been closed",
120 )));
121 };
122
123 if this.buf.is_empty() {
124 let fut = async move {
125 let res = w.close().await;
126 (w, res)
127 };
128 this.state = State::Closing(Box::pin(fut));
129 } else {
130 let buf = this.buf.clone();
131 let fut = async move {
132 let res = w.write(buf).await;
133 (w, res)
134 };
135 this.state = State::Writing(Box::pin(fut));
136 }
137 }
138 State::Writing(fut) => {
139 let (w, res) = ready!(fut.as_mut().poll(cx));
140 this.state = State::Idle(Some(w));
141 match res {
142 Ok(n) => {
143 this.buf.advance(n);
144 }
145 Err(err) => return Poll::Ready(Err(err)),
146 }
147 }
148 State::Closing(fut) => {
149 let (w, res) = ready!(fut.as_mut().poll(cx));
150 this.state = State::Idle(Some(w));
151 match res {
152 Ok(_) => {
153 this.state = State::Idle(None);
154 return Poll::Ready(Ok(()));
155 }
156 Err(err) => return Poll::Ready(Err(err)),
157 }
158 }
159 }
160 }
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use std::sync::Arc;
167
168 use crate::raw::*;
169 use crate::*;
170
171 #[tokio::test]
172 async fn test_trait() {
173 let op = Operator::via_iter(Scheme::Memory, []).unwrap();
174
175 let acc = op.into_inner();
176 let ctx = Arc::new(WriteContext::new(
177 acc,
178 "test".to_string(),
179 OpWrite::new(),
180 OpWriter::new().with_chunk(1),
181 ));
182 let write_gen = WriteGenerator::create(ctx).await.unwrap();
183
184 let v = BufferSink::new(write_gen);
185
186 let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(v);
187 }
188}