opendal/types/write/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use crate::raw::*;
23use crate::*;
24
25/// Writer is designed to write data into given path in an asynchronous
26/// manner.
27///
28/// ## Notes
29///
30/// Please make sure either `close` or `abort` has been called before
31/// dropping the writer otherwise the data could be lost.
32///
33/// ## Usage
34///
35/// ### Write Multiple Chunks
36///
37/// Some services support to write multiple chunks of data into given path. Services that doesn't
38/// support write multiple chunks will return [`ErrorKind::Unsupported`] error when calling `write`
39/// at the second time.
40///
41/// ```
42/// use opendal::Operator;
43/// use opendal::Result;
44///
45/// async fn test(op: Operator) -> Result<()> {
46///     let mut w = op.writer("path/to/file").await?;
47///     w.write(vec![1; 1024]).await?;
48///     w.write(vec![2; 1024]).await?;
49///     w.close().await?;
50///     Ok(())
51/// }
52/// ```
53///
54/// ### Write like `Sink`
55///
56/// ```
57/// use anyhow::Result;
58/// use futures::SinkExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62///     let mut w = op.writer("path/to/file").await?.into_bytes_sink();
63///     w.send(vec![1; 1024].into()).await?;
64///     w.send(vec![2; 1024].into()).await?;
65///     w.close().await?;
66///     Ok(())
67/// }
68/// ```
69///
70/// ### Write like `AsyncWrite`
71///
72/// ```
73/// use anyhow::Result;
74/// use futures::AsyncWriteExt;
75/// use opendal::Operator;
76///
77/// async fn test(op: Operator) -> Result<()> {
78///     let mut w = op.writer("path/to/file").await?.into_futures_async_write();
79///     w.write(&vec![1; 1024]).await?;
80///     w.write(&vec![2; 1024]).await?;
81///     w.close().await?;
82///     Ok(())
83/// }
84/// ```
85///
86/// ### Write with append enabled
87///
88/// Writer also supports to write with append enabled. This is useful when users want to append
89/// some data to the end of the file.
90///
91/// - If file doesn't exist, it will be created and just like calling `write`.
92/// - If file exists, data will be appended to the end of the file.
93///
94/// Possible Errors:
95///
96/// - Some services store normal file and appendable file in different way. Trying to append
97///   on non-appendable file could return [`ErrorKind::ConditionNotMatch`] error.
98/// - Services that doesn't support append will return [`ErrorKind::Unsupported`] error when
99///   creating writer with `append` enabled.
100pub struct Writer {
101    /// Keep a reference to write context in writer.
102    _ctx: Arc<WriteContext>,
103    inner: WriteGenerator<oio::Writer>,
104}
105
106impl Writer {
107    /// Create a new writer from an `oio::Writer`.
108    pub(crate) async fn new(ctx: WriteContext) -> Result<Self> {
109        let ctx = Arc::new(ctx);
110        let inner = WriteGenerator::create(ctx.clone()).await?;
111
112        Ok(Self { _ctx: ctx, inner })
113    }
114
115    /// Write [`Buffer`] into writer.
116    ///
117    /// This operation will write all data in given buffer into writer.
118    ///
119    /// ## Examples
120    ///
121    /// ```
122    /// use bytes::Bytes;
123    /// use opendal::Operator;
124    /// use opendal::Result;
125    ///
126    /// async fn test(op: Operator) -> Result<()> {
127    ///     let mut w = op.writer("hello.txt").await?;
128    ///     // Buffer can be created from continues bytes.
129    ///     w.write("hello, world").await?;
130    ///     // Buffer can also be created from non-continues bytes.
131    ///     w.write(vec![Bytes::from("hello,"), Bytes::from("world!")])
132    ///         .await?;
133    ///
134    ///     // Make sure file has been written completely.
135    ///     w.close().await?;
136    ///     Ok(())
137    /// }
138    /// ```
139    pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
140        let mut bs = bs.into();
141        while !bs.is_empty() {
142            let n = self.inner.write(bs.clone()).await?;
143            bs.advance(n);
144        }
145
146        Ok(())
147    }
148
149    /// Write [`bytes::Buf`] into inner writer.
150    ///
151    /// This operation will write all data in given buffer into writer.
152    ///
153    /// # TODO
154    ///
155    /// Optimize this function to avoid unnecessary copy.
156    pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
157        let mut bs = bs;
158        let bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
159        self.write(bs).await
160    }
161
162    /// Abort the writer and clean up all written data.
163    ///
164    /// ## Notes
165    ///
166    /// Abort should only be called when the writer is not closed or
167    /// aborted, otherwise an unexpected error could be returned.
168    pub async fn abort(&mut self) -> Result<()> {
169        self.inner.abort().await
170    }
171
172    /// Close the writer and make sure all data have been committed.
173    ///
174    /// ## Notes
175    ///
176    /// Close should only be called when the writer is not closed or
177    /// aborted, otherwise an unexpected error could be returned.
178    pub async fn close(&mut self) -> Result<Metadata> {
179        self.inner.close().await
180    }
181
182    /// Convert writer into [`BufferSink`] which implements [`Sink<Buffer>`].
183    ///
184    /// # Notes
185    ///
186    /// BufferSink is a zero-cost abstraction. The underlying writer
187    /// will reuse the Bytes and won't perform any copy operation over data.
188    ///
189    /// # Examples
190    ///
191    /// ## Basic Usage
192    ///
193    /// ```
194    /// use std::io;
195    ///
196    /// use bytes::Bytes;
197    /// use futures::SinkExt;
198    /// use opendal::{Buffer, Operator};
199    /// use opendal::Result;
200    ///
201    /// async fn test(op: Operator) -> io::Result<()> {
202    ///     let mut s = op.writer("hello.txt").await?.into_sink();
203    ///     let bs = "Hello, World!".as_bytes();
204    ///     s.send(Buffer::from(bs)).await?;
205    ///     s.close().await?;
206    ///
207    ///     Ok(())
208    /// }
209    /// ```
210    ///
211    /// ## Concurrent Write
212    ///
213    /// ```
214    /// use std::io;
215    ///
216    /// use bytes::Bytes;
217    /// use futures::SinkExt;
218    /// use opendal::{Buffer, Operator};
219    /// use opendal::Result;
220    ///
221    /// async fn test(op: Operator) -> io::Result<()> {
222    ///     let mut w = op
223    ///         .writer_with("hello.txt")
224    ///         .concurrent(8)
225    ///         .chunk(256)
226    ///         .await?
227    ///         .into_sink();
228    ///     let bs = "Hello, World!".as_bytes();
229    ///     w.send(Buffer::from(bs)).await?;
230    ///     w.close().await?;
231    ///
232    ///     Ok(())
233    /// }
234    /// ```
235    pub fn into_sink(self) -> BufferSink {
236        BufferSink::new(self.inner)
237    }
238
239    /// Convert writer into [`FuturesAsyncWriter`] which implements [`futures::AsyncWrite`],
240    ///
241    /// # Notes
242    ///
243    /// FuturesAsyncWriter is not a zero-cost abstraction. The underlying writer
244    /// requires an owned [`Buffer`], which involves an extra copy operation.
245    ///
246    /// FuturesAsyncWriter is required to call `close()` to make sure all
247    /// data have been written to the storage.
248    ///
249    /// # Examples
250    ///
251    /// ## Basic Usage
252    ///
253    /// ```
254    /// use std::io;
255    ///
256    /// use futures::io::AsyncWriteExt;
257    /// use opendal::Operator;
258    /// use opendal::Result;
259    ///
260    /// async fn test(op: Operator) -> io::Result<()> {
261    ///     let mut w = op.writer("hello.txt").await?.into_futures_async_write();
262    ///     let bs = "Hello, World!".as_bytes();
263    ///     w.write_all(bs).await?;
264    ///     w.close().await?;
265    ///
266    ///     Ok(())
267    /// }
268    /// ```
269    ///
270    /// ## Concurrent Write
271    ///
272    /// ```
273    /// use std::io;
274    ///
275    /// use futures::io::AsyncWriteExt;
276    /// use opendal::Operator;
277    /// use opendal::Result;
278    ///
279    /// async fn test(op: Operator) -> io::Result<()> {
280    ///     let mut w = op
281    ///         .writer_with("hello.txt")
282    ///         .concurrent(8)
283    ///         .chunk(256)
284    ///         .await?
285    ///         .into_futures_async_write();
286    ///     let bs = "Hello, World!".as_bytes();
287    ///     w.write_all(bs).await?;
288    ///     w.close().await?;
289    ///
290    ///     Ok(())
291    /// }
292    /// ```
293    pub fn into_futures_async_write(self) -> FuturesAsyncWriter {
294        FuturesAsyncWriter::new(self.inner)
295    }
296
297    /// Convert writer into [`FuturesBytesSink`] which implements [`futures::Sink<Bytes>`].
298    ///
299    /// # Notes
300    ///
301    /// FuturesBytesSink is a zero-cost abstraction. The underlying writer
302    /// will reuse the Bytes and won't perform any copy operation.
303    ///
304    /// # Examples
305    ///
306    /// ## Basic Usage
307    ///
308    /// ```
309    /// use std::io;
310    ///
311    /// use bytes::Bytes;
312    /// use futures::SinkExt;
313    /// use opendal::Operator;
314    /// use opendal::Result;
315    ///
316    /// async fn test(op: Operator) -> io::Result<()> {
317    ///     let mut w = op.writer("hello.txt").await?.into_bytes_sink();
318    ///     let bs = "Hello, World!".as_bytes();
319    ///     w.send(Bytes::from(bs)).await?;
320    ///     w.close().await?;
321    ///
322    ///     Ok(())
323    /// }
324    /// ```
325    ///
326    /// ## Concurrent Write
327    ///
328    /// ```
329    /// use std::io;
330    ///
331    /// use bytes::Bytes;
332    /// use futures::SinkExt;
333    /// use opendal::Operator;
334    /// use opendal::Result;
335    ///
336    /// async fn test(op: Operator) -> io::Result<()> {
337    ///     let mut w = op
338    ///         .writer_with("hello.txt")
339    ///         .concurrent(8)
340    ///         .chunk(256)
341    ///         .await?
342    ///         .into_bytes_sink();
343    ///     let bs = "Hello, World!".as_bytes();
344    ///     w.send(Bytes::from(bs)).await?;
345    ///     w.close().await?;
346    ///
347    ///     Ok(())
348    /// }
349    /// ```
350    pub fn into_bytes_sink(self) -> FuturesBytesSink {
351        FuturesBytesSink::new(self.inner)
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use bytes::Bytes;
358    use rand::rngs::ThreadRng;
359    use rand::Rng;
360    use rand::RngCore;
361
362    use crate::services;
363    use crate::Operator;
364
365    fn gen_random_bytes() -> Vec<u8> {
366        let mut rng = ThreadRng::default();
367        // Generate size between 1B..16MB.
368        let size = rng.gen_range(1..16 * 1024 * 1024);
369        let mut content = vec![0; size];
370        rng.fill_bytes(&mut content);
371        content
372    }
373
374    #[tokio::test]
375    async fn test_writer_write() {
376        let op = Operator::new(services::Memory::default()).unwrap().finish();
377        let path = "test_file";
378
379        let content = gen_random_bytes();
380        let mut writer = op.writer(path).await.unwrap();
381        writer
382            .write(content.clone())
383            .await
384            .expect("write must succeed");
385        writer.close().await.expect("close must succeed");
386
387        let buf = op.read(path).await.expect("read to end mut succeed");
388
389        assert_eq!(buf.to_bytes(), content);
390    }
391
392    #[tokio::test]
393    async fn test_writer_write_from() {
394        let op = Operator::new(services::Memory::default()).unwrap().finish();
395        let path = "test_file";
396
397        let content = gen_random_bytes();
398        let mut writer = op.writer(path).await.unwrap();
399        writer
400            .write_from(Bytes::from(content.clone()))
401            .await
402            .expect("write must succeed");
403        writer.close().await.expect("close must succeed");
404
405        let buf = op.read(path).await.expect("read to end mut succeed");
406
407        assert_eq!(buf.to_bytes(), content);
408    }
409}