opendal/types/write/writer.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use crate::raw::*;
23use crate::*;
24
25/// Writer is designed to write data into given path in an asynchronous
26/// manner.
27///
28/// ## Notes
29///
30/// Please make sure either `close` or `abort` has been called before
31/// dropping the writer otherwise the data could be lost.
32///
33/// ## Usage
34///
35/// ### Write Multiple Chunks
36///
37/// Some services support to write multiple chunks of data into given path. Services that doesn't
38/// support write multiple chunks will return [`ErrorKind::Unsupported`] error when calling `write`
39/// at the second time.
40///
41/// ```
42/// use opendal::Operator;
43/// use opendal::Result;
44///
45/// async fn test(op: Operator) -> Result<()> {
46/// let mut w = op.writer("path/to/file").await?;
47/// w.write(vec![1; 1024]).await?;
48/// w.write(vec![2; 1024]).await?;
49/// w.close().await?;
50/// Ok(())
51/// }
52/// ```
53///
54/// ### Write like `Sink`
55///
56/// ```
57/// use anyhow::Result;
58/// use futures::SinkExt;
59/// use opendal::Operator;
60///
61/// async fn test(op: Operator) -> Result<()> {
62/// let mut w = op.writer("path/to/file").await?.into_bytes_sink();
63/// w.send(vec![1; 1024].into()).await?;
64/// w.send(vec![2; 1024].into()).await?;
65/// w.close().await?;
66/// Ok(())
67/// }
68/// ```
69///
70/// ### Write like `AsyncWrite`
71///
72/// ```
73/// use anyhow::Result;
74/// use futures::AsyncWriteExt;
75/// use opendal::Operator;
76///
77/// async fn test(op: Operator) -> Result<()> {
78/// let mut w = op.writer("path/to/file").await?.into_futures_async_write();
79/// w.write(&vec![1; 1024]).await?;
80/// w.write(&vec![2; 1024]).await?;
81/// w.close().await?;
82/// Ok(())
83/// }
84/// ```
85///
86/// ### Write with append enabled
87///
88/// Writer also supports to write with append enabled. This is useful when users want to append
89/// some data to the end of the file.
90///
91/// - If file doesn't exist, it will be created and just like calling `write`.
92/// - If file exists, data will be appended to the end of the file.
93///
94/// Possible Errors:
95///
96/// - Some services store normal file and appendable file in different way. Trying to append
97/// on non-appendable file could return [`ErrorKind::ConditionNotMatch`] error.
98/// - Services that doesn't support append will return [`ErrorKind::Unsupported`] error when
99/// creating writer with `append` enabled.
100pub struct Writer {
101 /// Keep a reference to write context in writer.
102 _ctx: Arc<WriteContext>,
103 inner: WriteGenerator<oio::Writer>,
104}
105
106impl Writer {
107 /// Create a new writer from an `oio::Writer`.
108 pub(crate) async fn new(ctx: WriteContext) -> Result<Self> {
109 let ctx = Arc::new(ctx);
110 let inner = WriteGenerator::create(ctx.clone()).await?;
111
112 Ok(Self { _ctx: ctx, inner })
113 }
114
115 /// Write [`Buffer`] into writer.
116 ///
117 /// This operation will write all data in given buffer into writer.
118 ///
119 /// ## Examples
120 ///
121 /// ```
122 /// use bytes::Bytes;
123 /// use opendal::Operator;
124 /// use opendal::Result;
125 ///
126 /// async fn test(op: Operator) -> Result<()> {
127 /// let mut w = op.writer("hello.txt").await?;
128 /// // Buffer can be created from continues bytes.
129 /// w.write("hello, world").await?;
130 /// // Buffer can also be created from non-continues bytes.
131 /// w.write(vec![Bytes::from("hello,"), Bytes::from("world!")])
132 /// .await?;
133 ///
134 /// // Make sure file has been written completely.
135 /// w.close().await?;
136 /// Ok(())
137 /// }
138 /// ```
139 pub async fn write(&mut self, bs: impl Into<Buffer>) -> Result<()> {
140 let mut bs = bs.into();
141 while !bs.is_empty() {
142 let n = self.inner.write(bs.clone()).await?;
143 bs.advance(n);
144 }
145
146 Ok(())
147 }
148
149 /// Write [`bytes::Buf`] into inner writer.
150 ///
151 /// This operation will write all data in given buffer into writer.
152 ///
153 /// # TODO
154 ///
155 /// Optimize this function to avoid unnecessary copy.
156 pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
157 let mut bs = bs;
158 let bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
159 self.write(bs).await
160 }
161
162 /// Abort the writer and clean up all written data.
163 ///
164 /// ## Notes
165 ///
166 /// Abort should only be called when the writer is not closed or
167 /// aborted, otherwise an unexpected error could be returned.
168 pub async fn abort(&mut self) -> Result<()> {
169 self.inner.abort().await
170 }
171
172 /// Close the writer and make sure all data have been committed.
173 ///
174 /// ## Notes
175 ///
176 /// Close should only be called when the writer is not closed or
177 /// aborted, otherwise an unexpected error could be returned.
178 pub async fn close(&mut self) -> Result<Metadata> {
179 self.inner.close().await
180 }
181
182 /// Convert writer into [`BufferSink`] which implements [`Sink<Buffer>`].
183 ///
184 /// # Notes
185 ///
186 /// BufferSink is a zero-cost abstraction. The underlying writer
187 /// will reuse the Bytes and won't perform any copy operation over data.
188 ///
189 /// # Examples
190 ///
191 /// ## Basic Usage
192 ///
193 /// ```
194 /// use std::io;
195 ///
196 /// use bytes::Bytes;
197 /// use futures::SinkExt;
198 /// use opendal::{Buffer, Operator};
199 /// use opendal::Result;
200 ///
201 /// async fn test(op: Operator) -> io::Result<()> {
202 /// let mut s = op.writer("hello.txt").await?.into_sink();
203 /// let bs = "Hello, World!".as_bytes();
204 /// s.send(Buffer::from(bs)).await?;
205 /// s.close().await?;
206 ///
207 /// Ok(())
208 /// }
209 /// ```
210 ///
211 /// ## Concurrent Write
212 ///
213 /// ```
214 /// use std::io;
215 ///
216 /// use bytes::Bytes;
217 /// use futures::SinkExt;
218 /// use opendal::{Buffer, Operator};
219 /// use opendal::Result;
220 ///
221 /// async fn test(op: Operator) -> io::Result<()> {
222 /// let mut w = op
223 /// .writer_with("hello.txt")
224 /// .concurrent(8)
225 /// .chunk(256)
226 /// .await?
227 /// .into_sink();
228 /// let bs = "Hello, World!".as_bytes();
229 /// w.send(Buffer::from(bs)).await?;
230 /// w.close().await?;
231 ///
232 /// Ok(())
233 /// }
234 /// ```
235 pub fn into_sink(self) -> BufferSink {
236 BufferSink::new(self.inner)
237 }
238
239 /// Convert writer into [`FuturesAsyncWriter`] which implements [`futures::AsyncWrite`],
240 ///
241 /// # Notes
242 ///
243 /// FuturesAsyncWriter is not a zero-cost abstraction. The underlying writer
244 /// requires an owned [`Buffer`], which involves an extra copy operation.
245 ///
246 /// FuturesAsyncWriter is required to call `close()` to make sure all
247 /// data have been written to the storage.
248 ///
249 /// # Examples
250 ///
251 /// ## Basic Usage
252 ///
253 /// ```
254 /// use std::io;
255 ///
256 /// use futures::io::AsyncWriteExt;
257 /// use opendal::Operator;
258 /// use opendal::Result;
259 ///
260 /// async fn test(op: Operator) -> io::Result<()> {
261 /// let mut w = op.writer("hello.txt").await?.into_futures_async_write();
262 /// let bs = "Hello, World!".as_bytes();
263 /// w.write_all(bs).await?;
264 /// w.close().await?;
265 ///
266 /// Ok(())
267 /// }
268 /// ```
269 ///
270 /// ## Concurrent Write
271 ///
272 /// ```
273 /// use std::io;
274 ///
275 /// use futures::io::AsyncWriteExt;
276 /// use opendal::Operator;
277 /// use opendal::Result;
278 ///
279 /// async fn test(op: Operator) -> io::Result<()> {
280 /// let mut w = op
281 /// .writer_with("hello.txt")
282 /// .concurrent(8)
283 /// .chunk(256)
284 /// .await?
285 /// .into_futures_async_write();
286 /// let bs = "Hello, World!".as_bytes();
287 /// w.write_all(bs).await?;
288 /// w.close().await?;
289 ///
290 /// Ok(())
291 /// }
292 /// ```
293 pub fn into_futures_async_write(self) -> FuturesAsyncWriter {
294 FuturesAsyncWriter::new(self.inner)
295 }
296
297 /// Convert writer into [`FuturesBytesSink`] which implements [`futures::Sink<Bytes>`].
298 ///
299 /// # Notes
300 ///
301 /// FuturesBytesSink is a zero-cost abstraction. The underlying writer
302 /// will reuse the Bytes and won't perform any copy operation.
303 ///
304 /// # Examples
305 ///
306 /// ## Basic Usage
307 ///
308 /// ```
309 /// use std::io;
310 ///
311 /// use bytes::Bytes;
312 /// use futures::SinkExt;
313 /// use opendal::Operator;
314 /// use opendal::Result;
315 ///
316 /// async fn test(op: Operator) -> io::Result<()> {
317 /// let mut w = op.writer("hello.txt").await?.into_bytes_sink();
318 /// let bs = "Hello, World!".as_bytes();
319 /// w.send(Bytes::from(bs)).await?;
320 /// w.close().await?;
321 ///
322 /// Ok(())
323 /// }
324 /// ```
325 ///
326 /// ## Concurrent Write
327 ///
328 /// ```
329 /// use std::io;
330 ///
331 /// use bytes::Bytes;
332 /// use futures::SinkExt;
333 /// use opendal::Operator;
334 /// use opendal::Result;
335 ///
336 /// async fn test(op: Operator) -> io::Result<()> {
337 /// let mut w = op
338 /// .writer_with("hello.txt")
339 /// .concurrent(8)
340 /// .chunk(256)
341 /// .await?
342 /// .into_bytes_sink();
343 /// let bs = "Hello, World!".as_bytes();
344 /// w.send(Bytes::from(bs)).await?;
345 /// w.close().await?;
346 ///
347 /// Ok(())
348 /// }
349 /// ```
350 pub fn into_bytes_sink(self) -> FuturesBytesSink {
351 FuturesBytesSink::new(self.inner)
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use bytes::Bytes;
358 use rand::rngs::ThreadRng;
359 use rand::Rng;
360 use rand::RngCore;
361
362 use crate::services;
363 use crate::Operator;
364
365 fn gen_random_bytes() -> Vec<u8> {
366 let mut rng = ThreadRng::default();
367 // Generate size between 1B..16MB.
368 let size = rng.gen_range(1..16 * 1024 * 1024);
369 let mut content = vec![0; size];
370 rng.fill_bytes(&mut content);
371 content
372 }
373
374 #[tokio::test]
375 async fn test_writer_write() {
376 let op = Operator::new(services::Memory::default()).unwrap().finish();
377 let path = "test_file";
378
379 let content = gen_random_bytes();
380 let mut writer = op.writer(path).await.unwrap();
381 writer
382 .write(content.clone())
383 .await
384 .expect("write must succeed");
385 writer.close().await.expect("close must succeed");
386
387 let buf = op.read(path).await.expect("read to end mut succeed");
388
389 assert_eq!(buf.to_bytes(), content);
390 }
391
392 #[tokio::test]
393 async fn test_writer_write_from() {
394 let op = Operator::new(services::Memory::default()).unwrap().finish();
395 let path = "test_file";
396
397 let content = gen_random_bytes();
398 let mut writer = op.writer(path).await.unwrap();
399 writer
400 .write_from(Bytes::from(content.clone()))
401 .await
402 .expect("write must succeed");
403 writer.close().await.expect("close must succeed");
404
405 let buf = op.read(path).await.expect("read to end mut succeed");
406
407 assert_eq!(buf.to_bytes(), content);
408 }
409}