opendal/types/write/
buffer_sink.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::pin::Pin;
19use std::task::ready;
20use std::task::Context;
21use std::task::Poll;
22
23use bytes::Buf;
24
25use crate::raw::*;
26use crate::*;
27
28/// BufferSink is the adapter of [`futures::Sink`] generated by [`Writer::into_sink`]
29///
30/// Users can use this adapter in cases where they need to use [`futures::Sink`]
31pub struct BufferSink {
32    state: State,
33    buf: Buffer,
34}
35
36enum State {
37    Idle(Option<WriteGenerator<oio::Writer>>),
38    Writing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<usize>)>),
39    Closing(BoxedStaticFuture<(WriteGenerator<oio::Writer>, Result<Metadata>)>),
40}
41
42/// # Safety
43///
44/// FuturesReader only exposes `&mut self` to the outside world, so it's safe to be `Sync`.
45unsafe impl Sync for State {}
46
47impl BufferSink {
48    /// Create a new sink from a [`oio::Writer`].
49    #[inline]
50    pub(crate) fn new(w: WriteGenerator<oio::Writer>) -> Self {
51        BufferSink {
52            state: State::Idle(Some(w)),
53            buf: Buffer::new(),
54        }
55    }
56}
57
58impl futures::Sink<Buffer> for BufferSink {
59    type Error = Error;
60
61    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
62        self.poll_flush(cx)
63    }
64
65    fn start_send(mut self: Pin<&mut Self>, item: Buffer) -> Result<()> {
66        self.buf = item;
67        Ok(())
68    }
69
70    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
71        let this = self.get_mut();
72        loop {
73            match &mut this.state {
74                State::Idle(w) => {
75                    if this.buf.is_empty() {
76                        return Poll::Ready(Ok(()));
77                    }
78                    let Some(mut w) = w.take() else {
79                        return Poll::Ready(Err(Error::new(
80                            ErrorKind::Unexpected,
81                            "state invalid: sink has been closed",
82                        )));
83                    };
84                    let buf = this.buf.clone();
85                    let fut = async move {
86                        let res = w.write(buf).await;
87                        (w, res)
88                    };
89                    this.state = State::Writing(Box::pin(fut));
90                }
91                State::Writing(fut) => {
92                    let (w, res) = ready!(fut.as_mut().poll(cx));
93                    this.state = State::Idle(Some(w));
94                    match res {
95                        Ok(n) => {
96                            this.buf.advance(n);
97                        }
98                        Err(err) => return Poll::Ready(Err(err)),
99                    }
100                }
101                State::Closing(_) => {
102                    return Poll::Ready(Err(Error::new(
103                        ErrorKind::Unexpected,
104                        "state invalid: sink is closing",
105                    )))
106                }
107            }
108        }
109    }
110
111    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
112        let this = self.get_mut();
113        loop {
114            match &mut this.state {
115                State::Idle(w) => {
116                    let Some(mut w) = w.take() else {
117                        return Poll::Ready(Err(Error::new(
118                            ErrorKind::Unexpected,
119                            "state invalid: sink has been closed",
120                        )));
121                    };
122
123                    if this.buf.is_empty() {
124                        let fut = async move {
125                            let res = w.close().await;
126                            (w, res)
127                        };
128                        this.state = State::Closing(Box::pin(fut));
129                    } else {
130                        let buf = this.buf.clone();
131                        let fut = async move {
132                            let res = w.write(buf).await;
133                            (w, res)
134                        };
135                        this.state = State::Writing(Box::pin(fut));
136                    }
137                }
138                State::Writing(fut) => {
139                    let (w, res) = ready!(fut.as_mut().poll(cx));
140                    this.state = State::Idle(Some(w));
141                    match res {
142                        Ok(n) => {
143                            this.buf.advance(n);
144                        }
145                        Err(err) => return Poll::Ready(Err(err)),
146                    }
147                }
148                State::Closing(fut) => {
149                    let (w, res) = ready!(fut.as_mut().poll(cx));
150                    this.state = State::Idle(Some(w));
151                    match res {
152                        Ok(_) => {
153                            this.state = State::Idle(None);
154                            return Poll::Ready(Ok(()));
155                        }
156                        Err(err) => return Poll::Ready(Err(err)),
157                    }
158                }
159            }
160        }
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use std::sync::Arc;
167
168    use crate::raw::*;
169    use crate::*;
170
171    #[tokio::test]
172    async fn test_trait() {
173        let op = Operator::via_iter(Scheme::Memory, []).unwrap();
174
175        let acc = op.into_inner();
176        let ctx = Arc::new(WriteContext::new(
177            acc,
178            "test".to_string(),
179            OpWrite::new(),
180            OpWriter::new().with_chunk(1),
181        ));
182        let write_gen = WriteGenerator::create(ctx).await.unwrap();
183
184        let v = BufferSink::new(write_gen);
185
186        let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(v);
187    }
188}