opendal/types/write/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use crate::raw::*;
23use crate::*;
24
25/// Writer is designed to write data into given path in an asynchronous
26/// manner.
27///
28/// ## Notes
29///
30/// Please make sure either `close` or `abort` has been called before
31/// dropping the writer otherwise the data could be lost.
32///
33/// ## Usage
34///
35/// ### Write Multiple Chunks
36///
37/// Some services support to write multiple chunks of data into given path. Services that doesn't
38/// support write multiple chunks will return [`ErrorKind::Unsupported`] error when calling `write`
39/// at the second time.
40///
41/// ```
42/// use opendal::Operator;
43/// use opendal::Result;
44///
45/// async fn test(op: Operator) -> Result<()> {
46///     let mut w = op.writer("path/to/file").await?;
47///     w.write(vec![1; 1024]).await?;
48///     w.write(vec![2; 1024]).await?;
49///     w.close().await?;
50///     Ok(())
51/// }
52/// ```
53///
54/// ### Write like `Sink`
55///
56/// ```
57/// use anyhow::Result;
58/// use futures::SinkExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let mut w = op.writer("path/to/file").await?.into_bytes_sink();
63///     w.send(vec![1; 1024].into()).await?;
64///     w.send(vec![2; 1024].into()).await?;
65///     w.close().await?;
66///     Ok(())
67/// }
68/// ```
69///
70/// ### Write like `AsyncWrite`
71///
72/// ```
73/// use anyhow::Result;
74/// use futures::AsyncWriteExt;
75/// use opendal::Operator;
76///
77/// async fn test(op: Operator) -> Result<()> {
78///     let mut w = op.writer("path/to/file").await?.into_futures_async_write();
79///     w.write(&vec![1; 1024]).await?;
80///     w.write(&vec![2; 1024]).await?;
81///     w.close().await?;
82///     Ok(())
83/// }
84/// ```
85///
86/// ### Write with append enabled
87///
88/// Writer also supports to write with append enabled. This is useful when users want to append
89/// some data to the end of the file.
90///
91/// - If file doesn't exist, it will be created and just like calling `write`.
92/// - If file exists, data will be appended to the end of the file.
93///
94/// Possible Errors:
95///
96/// - Some services store normal file and appendable file in different way. Trying to append
97///   on non-appendable file could return [`ErrorKind::ConditionNotMatch`] error.
98/// - Services that doesn't support append will return [`ErrorKind::Unsupported`] error when
99///   creating writer with `append` enabled.
100pub struct Writer {
101    /// Keep a reference to write context in writer.
102    _ctx: Arc<WriteContext>,
103    inner: WriteGenerator<oio::Writer>,
104}
105
106impl Writer {
107    /// Create a new writer from an `oio::Writer`.
108    pub(crate) async fn new(ctx: WriteContext) -> Result<Self> {
109        let ctx = Arc::new(ctx);
110        let inner = WriteGenerator::create(ctx.clone()).await?;
111
112        Ok(Self { _ctx: ctx, inner })
113    }
114
115    /// Write [`Buffer`] into writer.
116    ///
117    /// This operation will write all data in given buffer into writer.
118    ///
119    /// ## Examples
120    ///
121    /// ```
122    /// use bytes::Bytes;
123    /// use opendal::Operator;
124    /// use opendal::Result;
125    ///
126    /// async fn test(op: Operator) -> Result<()> {
127    ///     let mut w = op.writer("hello.txt").await?;
128    ///     // Buffer can be created from continues bytes.
129    ///     w.write("hello, world").await?;
130    ///     // Buffer can also be created from non-continues bytes.
131    ///     w.write(vec![Bytes::from("hello,"), Bytes::from("world!")])
132    ///         .await?;
133    ///
134    ///     // Make sure file has been written completely.
135    ///     w.close().await?;
136    ///     Ok(())
137    /// }
138    /// ```
139    pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
140        let mut bs = bs.into();
141        while !bs.is_empty() {
142            let n = self.inner.write(bs.clone()).await?;
143            bs.advance(n);
144        }
145
146        Ok(())
147    }
148
149    /// Write [`bytes::Buf`] into inner writer.
150    ///
151    /// This operation will write all data in given buffer into writer.
152    ///
153    /// # TODO
154    ///
155    /// Optimize this function to avoid unnecessary copy.
156    pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
157        let mut bs = bs;
158        let bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
159        self.write(bs).await
160    }
161
162    /// Abort the writer and clean up all written data.
163    ///
164    /// ## Notes
165    ///
166    /// Abort should only be called when the writer is not closed or
167    /// aborted, otherwise an unexpected error could be returned.
168    pub async fn abort(&mut self) -> Result<()> {
169        self.inner.abort().await
170    }
171
172    /// Close the writer and make sure all data have been committed.
173    ///
174    /// ## Notes
175    ///
176    /// Close should only be called when the writer is not closed or
177    /// aborted, otherwise an unexpected error could be returned.
178    pub async fn close(&mut self) -> Result<Metadata> {
179        self.inner.close().await
180    }
181
182    /// Convert writer into [`BufferSink`] which implements [`Sink<Buffer>`].
183    ///
184    /// # Notes
185    ///
186    /// BufferSink is a zero-cost abstraction. The underlying writer
187    /// will reuse the Bytes and won't perform any copy operation over data.
188    ///
189    /// # Examples
190    ///
191    /// ## Basic Usage
192    ///
193    /// ```
194    /// use std::io;
195    ///
196    /// use bytes::Bytes;
197    /// use futures::SinkExt;
198    /// use opendal::Buffer;
199    /// use opendal::Operator;
200    /// use opendal::Result;
201    ///
202    /// async fn test(op: Operator) -> io::Result<()> {
203    ///     let mut s = op.writer("hello.txt").await?.into_sink();
204    ///     let bs = "Hello, World!".as_bytes();
205    ///     s.send(Buffer::from(bs)).await?;
206    ///     s.close().await?;
207    ///
208    ///     Ok(())
209    /// }
210    /// ```
211    ///
212    /// ## Concurrent Write
213    ///
214    /// ```
215    /// use std::io;
216    ///
217    /// use bytes::Bytes;
218    /// use futures::SinkExt;
219    /// use opendal::Buffer;
220    /// use opendal::Operator;
221    /// use opendal::Result;
222    ///
223    /// async fn test(op: Operator) -> io::Result<()> {
224    ///     let mut w = op
225    ///         .writer_with("hello.txt")
226    ///         .concurrent(8)
227    ///         .chunk(256)
228    ///         .await?
229    ///         .into_sink();
230    ///     let bs = "Hello, World!".as_bytes();
231    ///     w.send(Buffer::from(bs)).await?;
232    ///     w.close().await?;
233    ///
234    ///     Ok(())
235    /// }
236    /// ```
237    pub fn into_sink(self) -> BufferSink {
238        BufferSink::new(self.inner)
239    }
240
241    /// Convert writer into [`FuturesAsyncWriter`] which implements [`futures::AsyncWrite`],
242    ///
243    /// # Notes
244    ///
245    /// FuturesAsyncWriter is not a zero-cost abstraction. The underlying writer
246    /// requires an owned [`Buffer`], which involves an extra copy operation.
247    ///
248    /// FuturesAsyncWriter is required to call `close()` to make sure all
249    /// data have been written to the storage.
250    ///
251    /// # Examples
252    ///
253    /// ## Basic Usage
254    ///
255    /// ```
256    /// use std::io;
257    ///
258    /// use futures::io::AsyncWriteExt;
259    /// use opendal::Operator;
260    /// use opendal::Result;
261    ///
262    /// async fn test(op: Operator) -> io::Result<()> {
263    ///     let mut w = op.writer("hello.txt").await?.into_futures_async_write();
264    ///     let bs = "Hello, World!".as_bytes();
265    ///     w.write_all(bs).await?;
266    ///     w.close().await?;
267    ///
268    ///     Ok(())
269    /// }
270    /// ```
271    ///
272    /// ## Concurrent Write
273    ///
274    /// ```
275    /// use std::io;
276    ///
277    /// use futures::io::AsyncWriteExt;
278    /// use opendal::Operator;
279    /// use opendal::Result;
280    ///
281    /// async fn test(op: Operator) -> io::Result<()> {
282    ///     let mut w = op
283    ///         .writer_with("hello.txt")
284    ///         .concurrent(8)
285    ///         .chunk(256)
286    ///         .await?
287    ///         .into_futures_async_write();
288    ///     let bs = "Hello, World!".as_bytes();
289    ///     w.write_all(bs).await?;
290    ///     w.close().await?;
291    ///
292    ///     Ok(())
293    /// }
294    /// ```
295    pub fn into_futures_async_write(self) -> FuturesAsyncWriter {
296        FuturesAsyncWriter::new(self.inner)
297    }
298
299    /// Convert writer into [`FuturesBytesSink`] which implements [`futures::Sink<Bytes>`].
300    ///
301    /// # Notes
302    ///
303    /// FuturesBytesSink is a zero-cost abstraction. The underlying writer
304    /// will reuse the Bytes and won't perform any copy operation.
305    ///
306    /// # Examples
307    ///
308    /// ## Basic Usage
309    ///
310    /// ```
311    /// use std::io;
312    ///
313    /// use bytes::Bytes;
314    /// use futures::SinkExt;
315    /// use opendal::Operator;
316    /// use opendal::Result;
317    ///
318    /// async fn test(op: Operator) -> io::Result<()> {
319    ///     let mut w = op.writer("hello.txt").await?.into_bytes_sink();
320    ///     let bs = "Hello, World!".as_bytes();
321    ///     w.send(Bytes::from(bs)).await?;
322    ///     w.close().await?;
323    ///
324    ///     Ok(())
325    /// }
326    /// ```
327    ///
328    /// ## Concurrent Write
329    ///
330    /// ```
331    /// use std::io;
332    ///
333    /// use bytes::Bytes;
334    /// use futures::SinkExt;
335    /// use opendal::Operator;
336    /// use opendal::Result;
337    ///
338    /// async fn test(op: Operator) -> io::Result<()> {
339    ///     let mut w = op
340    ///         .writer_with("hello.txt")
341    ///         .concurrent(8)
342    ///         .chunk(256)
343    ///         .await?
344    ///         .into_bytes_sink();
345    ///     let bs = "Hello, World!".as_bytes();
346    ///     w.send(Bytes::from(bs)).await?;
347    ///     w.close().await?;
348    ///
349    ///     Ok(())
350    /// }
351    /// ```
352    pub fn into_bytes_sink(self) -> FuturesBytesSink {
353        FuturesBytesSink::new(self.inner)
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use bytes::Bytes;
360    use rand::rngs::ThreadRng;
361    use rand::Rng;
362    use rand::RngCore;
363
364    use crate::services;
365    use crate::Operator;
366
367    fn gen_random_bytes() -> Vec<u8> {
368        let mut rng = ThreadRng::default();
369        // Generate size between 1B..16MB.
370        let size = rng.gen_range(1..16 * 1024 * 1024);
371        let mut content = vec![0; size];
372        rng.fill_bytes(&mut content);
373        content
374    }
375
376    #[tokio::test]
377    async fn test_writer_write() {
378        let op = Operator::new(services::Memory::default()).unwrap().finish();
379        let path = "test_file";
380
381        let content = gen_random_bytes();
382        let mut writer = op.writer(path).await.unwrap();
383        writer
384            .write(content.clone())
385            .await
386            .expect("write must succeed");
387        writer.close().await.expect("close must succeed");
388
389        let buf = op.read(path).await.expect("read to end mut succeed");
390
391        assert_eq!(buf.to_bytes(), content);
392    }
393
394    #[tokio::test]
395    async fn test_writer_write_from() {
396        let op = Operator::new(services::Memory::default()).unwrap().finish();
397        let path = "test_file";
398
399        let content = gen_random_bytes();
400        let mut writer = op.writer(path).await.unwrap();
401        writer
402            .write_from(Bytes::from(content.clone()))
403            .await
404            .expect("write must succeed");
405        writer.close().await.expect("close must succeed");
406
407        let buf = op.read(path).await.expect("read to end mut succeed");
408
409        assert_eq!(buf.to_bytes(), content);
410    }
411}