opendal/types/operator/operator_futures.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Futures provides the futures generated by [`Operator`]
19//!
20//! By using futures, users can add more options for operation.
21
22use std::collections::HashMap;
23use std::future::IntoFuture;
24use std::ops::RangeBounds;
25use std::time::Duration;
26
27use chrono::DateTime;
28use chrono::Utc;
29use futures::Future;
30
31use crate::raw::*;
32use crate::*;
33
34/// OperatorFuture is the future generated by [`Operator`].
35///
36/// The future will consume all the input to generate a future.
37///
38/// # NOTES
39///
40/// This struct is by design to keep in crate. We don't want
41/// users to use this struct directly.
42pub struct OperatorFuture<I, O, F: Future<Output = Result<O>>> {
43 /// The accessor to the underlying object storage
44 acc: Accessor,
45 /// The path of string
46 path: String,
47 /// The input args
48 args: I,
49 /// The function which will move all the args and return a static future
50 f: fn(Accessor, String, I) -> F,
51}
52
53impl<I, O, F: Future<Output = Result<O>>> OperatorFuture<I, O, F> {
54 /// # NOTES
55 ///
56 /// This struct is by design to keep in crate. We don't want
57 /// users to use this struct directly.
58 pub(crate) fn new(
59 inner: Accessor,
60 path: String,
61 args: I,
62 f: fn(Accessor, String, I) -> F,
63 ) -> Self {
64 OperatorFuture {
65 acc: inner,
66 path,
67 args,
68 f,
69 }
70 }
71}
72
73impl<I, O, F> IntoFuture for OperatorFuture<I, O, F>
74where
75 F: Future<Output = Result<O>>,
76{
77 type Output = Result<O>;
78 type IntoFuture = F;
79
80 fn into_future(self) -> Self::IntoFuture {
81 (self.f)(self.acc, self.path, self.args)
82 }
83}
84
85/// Future that generated by [`Operator::stat_with`].
86///
87/// Users can add more options by public functions provided by this struct.
88pub type FutureStat<F> = OperatorFuture<options::StatOptions, Metadata, F>;
89
90impl<F: Future<Output = Result<Metadata>>> FutureStat<F> {
91 /// Set the If-Match for this operation.
92 ///
93 /// Refer to [`options::StatOptions::if_match`] for more details.
94 pub fn if_match(mut self, v: &str) -> Self {
95 self.args.if_match = Some(v.to_string());
96 self
97 }
98
99 /// Set the If-None-Match for this operation.
100 ///
101 /// Refer to [`options::StatOptions::if_none_match`] for more details.
102 pub fn if_none_match(mut self, v: &str) -> Self {
103 self.args.if_none_match = Some(v.to_string());
104 self
105 }
106
107 /// Set the If-Modified-Since for this operation.
108 ///
109 /// Refer to [`options::StatOptions::if_modified_since`] for more details.
110 pub fn if_modified_since(mut self, v: DateTime<Utc>) -> Self {
111 self.args.if_modified_since = Some(v);
112 self
113 }
114
115 /// Set the If-Unmodified-Since for this operation.
116 ///
117 /// Refer to [`options::StatOptions::if_unmodified_since`] for more details.
118 pub fn if_unmodified_since(mut self, v: DateTime<Utc>) -> Self {
119 self.args.if_unmodified_since = Some(v);
120 self
121 }
122
123 /// Set the version for this operation.
124 ///
125 /// Refer to [`options::StatOptions::version`] for more details.
126 pub fn version(mut self, v: &str) -> Self {
127 self.args.version = Some(v.to_string());
128 self
129 }
130}
131
132/// Future that generated by [`Operator::presign_stat_with`].
133///
134/// Users can add more options by public functions provided by this struct.
135pub type FuturePresignStat<F> =
136 OperatorFuture<(options::StatOptions, Duration), PresignedRequest, F>;
137
138impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignStat<F> {
139 /// Refer to [`options::StatOptions::override_content_disposition`] for more details.
140 pub fn override_content_disposition(mut self, v: &str) -> Self {
141 self.args.0.override_content_disposition = Some(v.to_string());
142 self
143 }
144
145 /// Refer to [`options::StatOptions::override_cache_control`] for more details.
146 pub fn override_cache_control(mut self, v: &str) -> Self {
147 self.args.0.override_cache_control = Some(v.to_string());
148 self
149 }
150
151 /// Refer to [`options::StatOptions::override_content_type`] for more details.
152 pub fn override_content_type(mut self, v: &str) -> Self {
153 self.args.0.override_content_type = Some(v.to_string());
154 self
155 }
156
157 /// Refer to [`options::StatOptions::if_match`] for more details.
158 pub fn if_match(mut self, v: &str) -> Self {
159 self.args.0.if_match = Some(v.to_string());
160 self
161 }
162
163 /// Refer to [`options::StatOptions::if_none_match`] for more details.
164 pub fn if_none_match(mut self, v: &str) -> Self {
165 self.args.0.if_none_match = Some(v.to_string());
166 self
167 }
168}
169
170/// Future that generated by [`Operator::presign_delete_with`].
171///
172/// Users can add more options by public functions provided by this struct.
173pub type FuturePresignDelete<F> =
174 OperatorFuture<(options::DeleteOptions, Duration), PresignedRequest, F>;
175
176impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignDelete<F> {}
177
178/// Future that generated by [`Operator::presign_read_with`].
179///
180/// Users can add more options by public functions provided by this struct.
181pub type FuturePresignRead<F> =
182 OperatorFuture<(options::ReadOptions, Duration), PresignedRequest, F>;
183
184impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignRead<F> {
185 /// Refer to [`options::ReadOptions::override_content_disposition`] for more details.
186 pub fn override_content_disposition(mut self, v: &str) -> Self {
187 self.args.0.override_content_disposition = Some(v.to_string());
188 self
189 }
190
191 /// Refer to [`options::ReadOptions::override_cache_control`] for more details.
192 pub fn override_cache_control(mut self, v: &str) -> Self {
193 self.args.0.override_cache_control = Some(v.to_string());
194 self
195 }
196
197 /// Refer to [`options::ReadOptions::override_content_type`] for more details.
198 pub fn override_content_type(mut self, v: &str) -> Self {
199 self.args.0.override_content_type = Some(v.to_string());
200 self
201 }
202
203 /// Refer to [`options::ReadOptions::if_match`] for more details.
204 pub fn if_match(mut self, v: &str) -> Self {
205 self.args.0.if_match = Some(v.to_string());
206 self
207 }
208
209 /// Refer to [`options::ReadOptions::if_none_match`] for more details.
210 pub fn if_none_match(mut self, v: &str) -> Self {
211 self.args.0.if_none_match = Some(v.to_string());
212 self
213 }
214}
215
216/// Future that generated by [`Operator::presign_write_with`].
217///
218/// Users can add more options by public functions provided by this struct.
219pub type FuturePresignWrite<F> =
220 OperatorFuture<(options::WriteOptions, Duration), PresignedRequest, F>;
221
222impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignWrite<F> {
223 /// Refer to [`options::WriteOptions::content_type`] for more details.
224 pub fn content_type(mut self, v: &str) -> Self {
225 self.args.0.content_type = Some(v.to_string());
226 self
227 }
228
229 /// Refer to [`options::WriteOptions::content_disposition`] for more details.
230 pub fn content_disposition(mut self, v: &str) -> Self {
231 self.args.0.content_disposition = Some(v.to_string());
232 self
233 }
234
235 /// Refer to [`options::WriteOptions::content_encoding`] for more details.
236 pub fn content_encoding(mut self, v: &str) -> Self {
237 self.args.0.content_encoding = Some(v.to_string());
238 self
239 }
240
241 /// Refer to [`options::WriteOptions::cache_control`] for more details.
242 pub fn cache_control(mut self, v: &str) -> Self {
243 self.args.0.cache_control = Some(v.to_string());
244 self
245 }
246}
247
248/// Future that generated by [`Operator::read_with`].
249///
250/// Users can add more options by public functions provided by this struct.
251pub type FutureRead<F> = OperatorFuture<options::ReadOptions, Buffer, F>;
252
253impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
254 /// Set `range` for this `read` request.
255 ///
256 /// If we have a file with size `n`.
257 ///
258 /// - `..` means read bytes in range `[0, n)` of file.
259 /// - `0..1024` and `..1024` means read bytes in range `[0, 1024)` of file
260 /// - `1024..` means read bytes in range `[1024, n)` of file
261 ///
262 /// ```
263 /// # use opendal::Result;
264 /// # use opendal::Operator;
265 /// # use futures::TryStreamExt;
266 /// # async fn test(op: Operator) -> Result<()> {
267 /// let bs = op.read_with("path/to/file").range(0..1024).await?;
268 /// # Ok(())
269 /// # }
270 /// ```
271 pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
272 self.args.range = range.into();
273 self
274 }
275
276 /// Set `concurrent` for the reader.
277 ///
278 /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users
279 /// read large chunks of data. By setting `concurrent`, opendal will read files concurrently
280 /// on support storage services.
281 ///
282 /// By setting `concurrent`, opendal will fetch chunks concurrently with
283 /// the given chunk size.
284 ///
285 /// ```
286 /// # use opendal::Result;
287 /// # use opendal::Operator;
288 /// # use opendal::Scheme;
289 /// # async fn test(op: Operator) -> Result<()> {
290 /// let r = op.read_with("path/to/file").concurrent(8).await?;
291 /// # Ok(())
292 /// # }
293 /// ```
294 pub fn concurrent(mut self, concurrent: usize) -> Self {
295 self.args.concurrent = concurrent.max(1);
296 self
297 }
298
299 /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs.
300 ///
301 /// This following example will make opendal read data in 4MiB chunks:
302 ///
303 /// ```
304 /// # use opendal::Result;
305 /// # use opendal::Operator;
306 /// # use opendal::Scheme;
307 /// # async fn test(op: Operator) -> Result<()> {
308 /// let r = op.read_with("path/to/file").chunk(4 * 1024 * 1024).await?;
309 /// # Ok(())
310 /// # }
311 /// ```
312 pub fn chunk(mut self, chunk_size: usize) -> Self {
313 self.args.chunk = Some(chunk_size);
314 self
315 }
316
317 /// Set `version` for this `read` request.
318 ///
319 /// This feature can be used to retrieve the data of a specified version of the given path.
320 ///
321 /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned.
322 ///
323 /// ```
324 /// # use opendal::Result;
325 /// # use opendal::Operator;
326 ///
327 /// # async fn test(op: Operator, version: &str) -> Result<()> {
328 /// let mut bs = op.read_with("path/to/file").version(version).await?;
329 /// # Ok(())
330 /// # }
331 /// ```
332 pub fn version(mut self, v: &str) -> Self {
333 self.args.version = Some(v.to_string());
334 self
335 }
336
337 /// Set `if_match` for this `read` request.
338 ///
339 /// This feature can be used to check if the file's `ETag` matches the given `ETag`.
340 ///
341 /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`]
342 /// will be returned.
343 ///
344 /// ```
345 /// # use opendal::Result;
346 /// use opendal::Operator;
347 /// # async fn test(op: Operator, etag: &str) -> Result<()> {
348 /// let mut metadata = op.read_with("path/to/file").if_match(etag).await?;
349 /// # Ok(())
350 /// # }
351 /// ```
352 pub fn if_match(mut self, v: &str) -> Self {
353 self.args.if_match = Some(v.to_string());
354 self
355 }
356
357 /// Set `if_none_match` for this `read` request.
358 ///
359 /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`.
360 ///
361 /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`]
362 /// will be returned.
363 ///
364 /// ```
365 /// # use opendal::Result;
366 /// use opendal::Operator;
367 /// # async fn test(op: Operator, etag: &str) -> Result<()> {
368 /// let mut metadata = op.read_with("path/to/file").if_none_match(etag).await?;
369 /// # Ok(())
370 /// # }
371 /// ```
372 pub fn if_none_match(mut self, v: &str) -> Self {
373 self.args.if_none_match = Some(v.to_string());
374 self
375 }
376
377 /// ## `if_modified_since`
378 ///
379 /// Set `if_modified_since` for this `read` request.
380 ///
381 /// This feature can be used to check if the file has been modified since the given timestamp.
382 ///
383 /// If file exists and it hasn't been modified since the specified time, an error with kind
384 /// [`ErrorKind::ConditionNotMatch`] will be returned.
385 ///
386 /// ```
387 /// # use opendal::Result;
388 /// use chrono::DateTime;
389 /// use chrono::Utc;
390 /// use opendal::Operator;
391 /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
392 /// let mut metadata = op.read_with("path/to/file").if_modified_since(time).await?;
393 /// # Ok(())
394 /// # }
395 /// ```
396 pub fn if_modified_since(mut self, v: DateTime<Utc>) -> Self {
397 self.args.if_modified_since = Some(v);
398 self
399 }
400
401 /// Set `if_unmodified_since` for this `read` request.
402 ///
403 /// This feature can be used to check if the file hasn't been modified since the given timestamp.
404 ///
405 /// If file exists and it has been modified since the specified time, an error with kind
406 /// [`ErrorKind::ConditionNotMatch`] will be returned.
407 ///
408 /// ```
409 /// # use opendal::Result;
410 /// use chrono::DateTime;
411 /// use chrono::Utc;
412 /// use opendal::Operator;
413 /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
414 /// let mut metadata = op
415 /// .read_with("path/to/file")
416 /// .if_unmodified_since(time)
417 /// .await?;
418 /// # Ok(())
419 /// # }
420 /// ```
421 pub fn if_unmodified_since(mut self, v: DateTime<Utc>) -> Self {
422 self.args.if_unmodified_since = Some(v);
423 self
424 }
425}
426
427/// Future that generated by [`Operator::read_with`] or [`Operator::reader_with`].
428///
429/// Users can add more options by public functions provided by this struct.
430///
431/// # Notes
432///
433/// `(OpRead, ())` is a trick to make sure `FutureReader` is different from `FutureRead`
434pub type FutureReader<F> = OperatorFuture<options::ReaderOptions, Reader, F>;
435
436impl<F: Future<Output = Result<Reader>>> FutureReader<F> {
437 /// Set `version` for this `reader`.
438 ///
439 /// This feature can be used to retrieve the data of a specified version of the given path.
440 ///
441 /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned.
442 ///
443 /// ```
444 /// # use opendal::Result;
445 /// # use opendal::Operator;
446 ///
447 /// # async fn test(op: Operator, version: &str) -> Result<()> {
448 /// let mut r = op.reader_with("path/to/file").version(version).await?;
449 /// # Ok(())
450 /// # }
451 /// ```
452 pub fn version(mut self, v: &str) -> Self {
453 self.args.version = Some(v.to_string());
454 self
455 }
456
457 /// Set `concurrent` for the reader.
458 ///
459 /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users
460 /// read large chunks of data. By setting `concurrent`, opendal will reading files concurrently
461 /// on support storage services.
462 ///
463 /// By setting `concurrent`, opendal will fetch chunks concurrently with
464 /// the give chunk size.
465 ///
466 /// ```
467 /// # use opendal::Result;
468 /// # use opendal::Operator;
469 /// # use opendal::Scheme;
470 /// # async fn test(op: Operator) -> Result<()> {
471 /// let r = op.reader_with("path/to/file").concurrent(8).await?;
472 /// # Ok(())
473 /// # }
474 /// ```
475 pub fn concurrent(mut self, concurrent: usize) -> Self {
476 self.args.concurrent = concurrent.max(1);
477 self
478 }
479
480 /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs.
481 ///
482 /// This following example will make opendal read data in 4MiB chunks:
483 ///
484 /// ```
485 /// # use opendal::Result;
486 /// # use opendal::Operator;
487 /// # use opendal::Scheme;
488 /// # async fn test(op: Operator) -> Result<()> {
489 /// let r = op
490 /// .reader_with("path/to/file")
491 /// .chunk(4 * 1024 * 1024)
492 /// .await?;
493 /// # Ok(())
494 /// # }
495 /// ```
496 pub fn chunk(mut self, chunk_size: usize) -> Self {
497 self.args.chunk = Some(chunk_size);
498 self
499 }
500
501 /// Controls the optimization strategy for range reads in [`Reader::fetch`].
502 ///
503 /// When performing range reads, if the gap between two requested ranges is smaller than
504 /// the configured `gap` size, OpenDAL will merge these ranges into a single read request
505 /// and discard the unrequested data in between. This helps reduce the number of API calls
506 /// to remote storage services.
507 ///
508 /// This optimization is particularly useful when performing multiple small range reads
509 /// that are close to each other, as it reduces the overhead of multiple network requests
510 /// at the cost of transferring some additional data.
511 ///
512 /// In this example, if two requested ranges are separated by less than 1MiB,
513 /// they will be merged into a single read request:
514 ///
515 /// ```
516 /// # use opendal::Result;
517 /// # use opendal::Operator;
518 /// # use opendal::Scheme;
519 /// # async fn test(op: Operator) -> Result<()> {
520 /// let r = op
521 /// .reader_with("path/to/file")
522 /// .chunk(4 * 1024 * 1024)
523 /// .gap(1024 * 1024) // 1MiB gap
524 /// .await?;
525 /// # Ok(())
526 /// # }
527 /// ```
528 pub fn gap(mut self, gap_size: usize) -> Self {
529 self.args.gap = Some(gap_size);
530 self
531 }
532
533 /// Set `if-match` for this `read` request.
534 ///
535 /// This feature can be used to check if the file's `ETag` matches the given `ETag`.
536 ///
537 /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`]
538 /// will be returned.
539 ///
540 /// ```
541 /// # use opendal::Result;
542 /// use opendal::Operator;
543 /// # async fn test(op: Operator, etag: &str) -> Result<()> {
544 /// let mut r = op.reader_with("path/to/file").if_match(etag).await?;
545 /// # Ok(())
546 /// # }
547 /// ```
548 pub fn if_match(mut self, etag: &str) -> Self {
549 self.args.if_match = Some(etag.to_string());
550 self
551 }
552
553 /// Set `if-none-match` for this `read` request.
554 ///
555 /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`.
556 ///
557 /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`]
558 /// will be returned.
559 ///
560 /// ```
561 /// # use opendal::Result;
562 /// use opendal::Operator;
563 /// # async fn test(op: Operator, etag: &str) -> Result<()> {
564 /// let mut r = op.reader_with("path/to/file").if_none_match(etag).await?;
565 /// # Ok(())
566 /// # }
567 /// ```
568 pub fn if_none_match(mut self, etag: &str) -> Self {
569 self.args.if_none_match = Some(etag.to_string());
570 self
571 }
572
573 /// Set `if-modified-since` for this `read` request.
574 ///
575 /// This feature can be used to check if the file has been modified since the given timestamp.
576 ///
577 /// If file exists and it hasn't been modified since the specified time, an error with kind
578 /// [`ErrorKind::ConditionNotMatch`] will be returned.
579 ///
580 /// ```
581 /// # use opendal::Result;
582 /// use chrono::DateTime;
583 /// use chrono::Utc;
584 /// use opendal::Operator;
585 /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
586 /// let mut r = op
587 /// .reader_with("path/to/file")
588 /// .if_modified_since(time)
589 /// .await?;
590 /// # Ok(())
591 /// # }
592 /// ```
593 pub fn if_modified_since(mut self, v: DateTime<Utc>) -> Self {
594 self.args.if_modified_since = Some(v);
595 self
596 }
597
598 /// Set `if-unmodified-since` for this `read` request.
599 ///
600 /// This feature can be used to check if the file hasn't been modified since the given timestamp.
601 ///
602 /// If file exists and it has been modified since the specified time, an error with kind
603 /// [`ErrorKind::ConditionNotMatch`] will be returned.
604 ///
605 /// ```
606 /// # use opendal::Result;
607 /// use chrono::DateTime;
608 /// use chrono::Utc;
609 /// use opendal::Operator;
610 /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
611 /// let mut r = op
612 /// .reader_with("path/to/file")
613 /// .if_unmodified_since(time)
614 /// .await?;
615 /// # Ok(())
616 /// # }
617 /// ```
618 pub fn if_unmodified_since(mut self, v: DateTime<Utc>) -> Self {
619 self.args.if_unmodified_since = Some(v);
620 self
621 }
622}
623
624/// Future that generated by [`Operator::write_with`].
625///
626/// Users can add more options by public functions provided by this struct.
627pub type FutureWrite<F> = OperatorFuture<(options::WriteOptions, Buffer), Metadata, F>;
628
629impl<F: Future<Output = Result<Metadata>>> FutureWrite<F> {
630 /// Sets append mode for this write request.
631 ///
632 /// Refer to [`options::WriteOptions::append`] for more details.
633 ///
634 /// ### Example
635 ///
636 /// ```
637 /// # use opendal::Result;
638 /// # use opendal::Operator;
639 /// # use futures::StreamExt;
640 /// # use futures::SinkExt;
641 /// use bytes::Bytes;
642 ///
643 /// # async fn test(op: Operator) -> Result<()> {
644 /// let _ = op
645 /// .write_with("path/to/file", vec![0; 4096])
646 /// .append(true)
647 /// .await?;
648 /// # Ok(())
649 /// # }
650 /// ```
651 pub fn append(mut self, v: bool) -> Self {
652 self.args.0.append = v;
653 self
654 }
655
656 /// Sets chunk size for buffered writes.
657 ///
658 /// Refer to [`options::WriteOptions::chunk`] for more details.
659 ///
660 /// ### Example
661 ///
662 /// ```
663 /// # use opendal::Result;
664 /// # use opendal::Operator;
665 /// # use futures::StreamExt;
666 /// # use futures::SinkExt;
667 /// use bytes::Bytes;
668 ///
669 /// # async fn test(op: Operator) -> Result<()> {
670 /// // Set 8MiB chunk size - data will be sent in one API call at close
671 /// let _ = op
672 /// .write_with("path/to/file", vec![0; 4096])
673 /// .chunk(8 * 1024 * 1024)
674 /// .await?;
675 /// # Ok(())
676 /// # }
677 /// ```
678 pub fn chunk(mut self, v: usize) -> Self {
679 self.args.0.chunk = Some(v);
680 self
681 }
682
683 /// Sets concurrent write operations for this writer.
684 ///
685 /// Refer to [`options::WriteOptions::concurrent`] for more details.
686 ///
687 /// ## Example
688 ///
689 /// ```
690 /// # use opendal::Result;
691 /// # use opendal::Operator;
692 /// # use futures::StreamExt;
693 /// # use futures::SinkExt;
694 /// use bytes::Bytes;
695 ///
696 /// # async fn test(op: Operator) -> Result<()> {
697 /// // Enable concurrent writes with 8 parallel operations at 128B chunk.
698 /// let _ = op
699 /// .write_with("path/to/file", vec![0; 4096])
700 /// .chunk(128)
701 /// .concurrent(8)
702 /// .await?;
703 /// # Ok(())
704 /// # }
705 /// ```
706 pub fn concurrent(mut self, v: usize) -> Self {
707 self.args.0.concurrent = v.max(1);
708 self
709 }
710
711 /// Sets Cache-Control header for this write operation.
712 ///
713 /// Refer to [`options::WriteOptions::cache_control`] for more details.
714 ///
715 /// ### Example
716 ///
717 /// ```
718 /// # use opendal::Result;
719 /// # use opendal::Operator;
720 /// # use futures::StreamExt;
721 /// # use futures::SinkExt;
722 /// use bytes::Bytes;
723 ///
724 /// # async fn test(op: Operator) -> Result<()> {
725 /// // Cache content for 7 days (604800 seconds)
726 /// let _ = op
727 /// .write_with("path/to/file", vec![0; 4096])
728 /// .cache_control("max-age=604800")
729 /// .await?;
730 /// # Ok(())
731 /// # }
732 /// ```
733 pub fn cache_control(mut self, v: &str) -> Self {
734 self.args.0.cache_control = Some(v.to_string());
735 self
736 }
737
738 /// Sets `Content-Type` header for this write operation.
739 ///
740 /// Refer to [`options::WriteOptions::content_type`] for more details.
741 ///
742 /// ## Example
743 ///
744 /// ```
745 /// # use opendal::Result;
746 /// # use opendal::Operator;
747 /// use bytes::Bytes;
748 ///
749 /// # async fn test(op: Operator) -> Result<()> {
750 /// // Set content type for plain text file
751 /// let _ = op
752 /// .write_with("path/to/file", vec![0; 4096])
753 /// .content_type("text/plain")
754 /// .await?;
755 /// # Ok(())
756 /// # }
757 /// ```
758 pub fn content_type(mut self, v: &str) -> Self {
759 self.args.0.content_type = Some(v.to_string());
760 self
761 }
762
763 /// Sets Content-Disposition header for this write request.
764 ///
765 /// Refer to [`options::WriteOptions::content_disposition`] for more details.
766 ///
767 /// ### Example
768 ///
769 /// ```
770 /// # use opendal::Result;
771 /// # use opendal::Operator;
772 /// # use futures::StreamExt;
773 /// # use futures::SinkExt;
774 /// use bytes::Bytes;
775 ///
776 /// # async fn test(op: Operator) -> Result<()> {
777 /// let _ = op
778 /// .write_with("path/to/file", vec![0; 4096])
779 /// .content_disposition("attachment; filename=\"filename.jpg\"")
780 /// .await?;
781 /// # Ok(())
782 /// # }
783 /// ```
784 pub fn content_disposition(mut self, v: &str) -> Self {
785 self.args.0.content_disposition = Some(v.to_string());
786 self
787 }
788
789 /// Sets Content-Encoding header for this write request.
790 ///
791 /// Refer to [`options::WriteOptions::content_encoding`] for more details.
792 ///
793 /// ### Example
794 ///
795 /// ```
796 /// # use opendal::Result;
797 /// # use opendal::Operator;
798 /// # use futures::StreamExt;
799 /// # use futures::SinkExt;
800 /// use bytes::Bytes;
801 ///
802 /// # async fn test(op: Operator) -> Result<()> {
803 /// let _ = op
804 /// .write_with("path/to/file", vec![0; 4096])
805 /// .content_encoding("gzip")
806 /// .await?;
807 /// # Ok(())
808 /// # }
809 /// ```
810 pub fn content_encoding(mut self, v: &str) -> Self {
811 self.args.0.content_encoding = Some(v.to_string());
812 self
813 }
814
815 /// Sets If-Match header for this write request.
816 ///
817 /// Refer to [`options::WriteOptions::if_match`] for more details.
818 ///
819 /// ### Example
820 ///
821 /// ```
822 /// # use opendal::Result;
823 /// # use opendal::Operator;
824 /// # use futures::StreamExt;
825 /// # use futures::SinkExt;
826 /// use bytes::Bytes;
827 ///
828 /// # async fn test(op: Operator) -> Result<()> {
829 /// let _ = op
830 /// .write_with("path/to/file", vec![0; 4096])
831 /// .if_match("\"686897696a7c876b7e\"")
832 /// .await?;
833 /// # Ok(())
834 /// # }
835 /// ```
836 pub fn if_match(mut self, s: &str) -> Self {
837 self.args.0.if_match = Some(s.to_string());
838 self
839 }
840
841 /// Sets If-None-Match header for this write request.
842 ///
843 /// Refer to [`options::WriteOptions::if_none_match`] for more details.
844 ///
845 /// ### Example
846 ///
847 /// ```
848 /// # use opendal::Result;
849 /// # use opendal::Operator;
850 /// # use futures::StreamExt;
851 /// # use futures::SinkExt;
852 /// use bytes::Bytes;
853 ///
854 /// # async fn test(op: Operator) -> Result<()> {
855 /// let _ = op
856 /// .write_with("path/to/file", vec![0; 4096])
857 /// .if_none_match("\"686897696a7c876b7e\"")
858 /// .await?;
859 /// # Ok(())
860 /// # }
861 /// ```
862 pub fn if_none_match(mut self, s: &str) -> Self {
863 self.args.0.if_none_match = Some(s.to_string());
864 self
865 }
866
867 /// Sets the condition that write operation will succeed only if target does not exist.
868 ///
869 /// Refer to [`options::WriteOptions::if_not_exists`] for more details.
870 ///
871 /// ### Example
872 ///
873 /// ```
874 /// # use opendal::Result;
875 /// # use opendal::Operator;
876 /// # use futures::StreamExt;
877 /// # use futures::SinkExt;
878 /// use bytes::Bytes;
879 ///
880 /// # async fn test(op: Operator) -> Result<()> {
881 /// let _ = op
882 /// .write_with("path/to/file", vec![0; 4096])
883 /// .if_not_exists(true)
884 /// .await?;
885 /// # Ok(())
886 /// # }
887 /// ```
888 pub fn if_not_exists(mut self, b: bool) -> Self {
889 self.args.0.if_not_exists = b;
890 self
891 }
892
893 /// Sets user metadata for this write request.
894 ///
895 /// Refer to [`options::WriteOptions::user_metadata`] for more details.
896 ///
897 /// ### Example
898 ///
899 /// ```
900 /// # use opendal::Result;
901 /// # use opendal::Operator;
902 /// # use futures::StreamExt;
903 /// # use futures::SinkExt;
904 /// use bytes::Bytes;
905 ///
906 /// # async fn test(op: Operator) -> Result<()> {
907 /// let _ = op
908 /// .write_with("path/to/file", vec![0; 4096])
909 /// .user_metadata([
910 /// ("language".to_string(), "rust".to_string()),
911 /// ("author".to_string(), "OpenDAL".to_string()),
912 /// ])
913 /// .await?;
914 /// # Ok(())
915 /// # }
916 /// ```
917 pub fn user_metadata(mut self, data: impl IntoIterator<Item = (String, String)>) -> Self {
918 self.args.0.user_metadata = Some(HashMap::from_iter(data));
919 self
920 }
921}
922
923/// Future that generated by [`Operator::writer_with`].
924///
925/// Users can add more options by public functions provided by this struct.
926pub type FutureWriter<F> = OperatorFuture<options::WriteOptions, Writer, F>;
927
928impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
929 /// Sets append mode for this write request.
930 ///
931 /// Refer to [`options::WriteOptions::append`] for more details.
932 ///
933 /// ### Example
934 ///
935 /// ```
936 /// # use opendal::Result;
937 /// # use opendal::Operator;
938 /// # use futures::StreamExt;
939 /// # use futures::SinkExt;
940 /// use bytes::Bytes;
941 ///
942 /// # async fn test(op: Operator) -> Result<()> {
943 /// let mut w = op.writer_with("path/to/file").append(true).await?;
944 /// w.write(vec![0; 4096]).await?;
945 /// w.write(vec![1; 4096]).await?;
946 /// w.close().await?;
947 /// # Ok(())
948 /// # }
949 /// ```
950 pub fn append(mut self, v: bool) -> Self {
951 self.args.append = v;
952 self
953 }
954
955 /// Sets chunk size for buffered writes.
956 ///
957 /// Refer to [`options::WriteOptions::chunk`] for more details.
958 ///
959 /// ### Example
960 ///
961 /// ```
962 /// # use opendal::Result;
963 /// # use opendal::Operator;
964 /// # use futures::StreamExt;
965 /// # use futures::SinkExt;
966 /// use bytes::Bytes;
967 ///
968 /// # async fn test(op: Operator) -> Result<()> {
969 /// // Set 8MiB chunk size - data will be sent in one API call at close
970 /// let mut w = op
971 /// .writer_with("path/to/file")
972 /// .chunk(8 * 1024 * 1024)
973 /// .await?;
974 /// w.write(vec![0; 4096]).await?;
975 /// w.write(vec![1; 4096]).await?;
976 /// w.close().await?;
977 /// # Ok(())
978 /// # }
979 /// ```
980 pub fn chunk(mut self, v: usize) -> Self {
981 self.args.chunk = Some(v);
982 self
983 }
984
985 /// Sets concurrent write operations for this writer.
986 ///
987 /// Refer to [`options::WriteOptions::concurrent`] for more details.
988 ///
989 /// ## Example
990 ///
991 /// ```
992 /// # use opendal::Result;
993 /// # use opendal::Operator;
994 /// # use futures::StreamExt;
995 /// # use futures::SinkExt;
996 /// use bytes::Bytes;
997 ///
998 /// # async fn test(op: Operator) -> Result<()> {
999 /// // Enable concurrent writes with 8 parallel operations
1000 /// let mut w = op.writer_with("path/to/file").concurrent(8).await?;
1001 ///
1002 /// // First write starts immediately
1003 /// w.write(vec![0; 4096]).await?;
1004 ///
1005 /// // Second write runs concurrently with first
1006 /// w.write(vec![1; 4096]).await?;
1007 ///
1008 /// // Ensures all writes complete successfully and in order
1009 /// w.close().await?;
1010 /// # Ok(())
1011 /// # }
1012 /// ```
1013 pub fn concurrent(mut self, v: usize) -> Self {
1014 self.args.concurrent = v.max(1);
1015 self
1016 }
1017
1018 /// Sets Cache-Control header for this write operation.
1019 ///
1020 /// Refer to [`options::WriteOptions::cache_control`] for more details.
1021 ///
1022 /// ### Example
1023 ///
1024 /// ```
1025 /// # use opendal::Result;
1026 /// # use opendal::Operator;
1027 /// # use futures::StreamExt;
1028 /// # use futures::SinkExt;
1029 /// use bytes::Bytes;
1030 ///
1031 /// # async fn test(op: Operator) -> Result<()> {
1032 /// // Cache content for 7 days (604800 seconds)
1033 /// let mut w = op
1034 /// .writer_with("path/to/file")
1035 /// .cache_control("max-age=604800")
1036 /// .await?;
1037 /// w.write(vec![0; 4096]).await?;
1038 /// w.write(vec![1; 4096]).await?;
1039 /// w.close().await?;
1040 /// # Ok(())
1041 /// # }
1042 /// ```
1043 pub fn cache_control(mut self, v: &str) -> Self {
1044 self.args.cache_control = Some(v.to_string());
1045 self
1046 }
1047
1048 /// Sets `Content-Type` header for this write operation.
1049 ///
1050 /// Refer to [`options::WriteOptions::content_type`] for more details.
1051 ///
1052 /// ## Example
1053 ///
1054 /// ```
1055 /// # use opendal::Result;
1056 /// # use opendal::Operator;
1057 /// use bytes::Bytes;
1058 ///
1059 /// # async fn test(op: Operator) -> Result<()> {
1060 /// // Set content type for plain text file
1061 /// let mut w = op
1062 /// .writer_with("path/to/file")
1063 /// .content_type("text/plain")
1064 /// .await?;
1065 /// w.write(vec![0; 4096]).await?;
1066 /// w.write(vec![1; 4096]).await?;
1067 /// w.close().await?;
1068 /// # Ok(())
1069 /// # }
1070 /// ```
1071 pub fn content_type(mut self, v: &str) -> Self {
1072 self.args.content_type = Some(v.to_string());
1073 self
1074 }
1075
1076 /// Sets Content-Disposition header for this write request.
1077 ///
1078 /// Refer to [`options::WriteOptions::content_disposition`] for more details.
1079 ///
1080 /// ### Example
1081 ///
1082 /// ```
1083 /// # use opendal::Result;
1084 /// # use opendal::Operator;
1085 /// # use futures::StreamExt;
1086 /// # use futures::SinkExt;
1087 /// use bytes::Bytes;
1088 ///
1089 /// # async fn test(op: Operator) -> Result<()> {
1090 /// let mut w = op
1091 /// .writer_with("path/to/file")
1092 /// .content_disposition("attachment; filename=\"filename.jpg\"")
1093 /// .await?;
1094 /// w.write(vec![0; 4096]).await?;
1095 /// w.write(vec![1; 4096]).await?;
1096 /// w.close().await?;
1097 /// # Ok(())
1098 /// # }
1099 /// ```
1100 pub fn content_disposition(mut self, v: &str) -> Self {
1101 self.args.content_disposition = Some(v.to_string());
1102 self
1103 }
1104
1105 /// Sets Content-Encoding header for this write request.
1106 ///
1107 /// Refer to [`options::WriteOptions::content_encoding`] for more details.
1108 ///
1109 /// ### Example
1110 ///
1111 /// ```
1112 /// # use opendal::Result;
1113 /// # use opendal::Operator;
1114 /// # use futures::StreamExt;
1115 /// # use futures::SinkExt;
1116 /// use bytes::Bytes;
1117 ///
1118 /// # async fn test(op: Operator) -> Result<()> {
1119 /// let mut w = op
1120 /// .writer_with("path/to/file")
1121 /// .content_encoding("gzip")
1122 /// .await?;
1123 /// w.write(vec![0; 4096]).await?;
1124 /// w.write(vec![1; 4096]).await?;
1125 /// w.close().await?;
1126 /// # Ok(())
1127 /// # }
1128 /// ```
1129 pub fn content_encoding(mut self, v: &str) -> Self {
1130 self.args.content_encoding = Some(v.to_string());
1131 self
1132 }
1133
1134 /// Sets If-Match header for this write request.
1135 ///
1136 /// Refer to [`options::WriteOptions::if_match`] for more details.
1137 ///
1138 /// ### Behavior
1139 ///
1140 /// - If supported, the write operation will only succeed if the target's ETag matches the specified value
1141 /// - The value should be a valid ETag string
1142 /// - Common values include:
1143 /// - A specific ETag value like `"686897696a7c876b7e"`
1144 /// - `*` - Matches any existing resource
1145 /// - If not supported, the value will be ignored
1146 ///
1147 /// This operation provides conditional write functionality based on ETag matching,
1148 /// helping prevent unintended overwrites in concurrent scenarios.
1149 ///
1150 /// ### Example
1151 ///
1152 /// ```
1153 /// # use opendal::Result;
1154 /// # use opendal::Operator;
1155 /// # use futures::StreamExt;
1156 /// # use futures::SinkExt;
1157 /// use bytes::Bytes;
1158 ///
1159 /// # async fn test(op: Operator) -> Result<()> {
1160 /// let mut w = op
1161 /// .writer_with("path/to/file")
1162 /// .if_match("\"686897696a7c876b7e\"")
1163 /// .await?;
1164 /// w.write(vec![0; 4096]).await?;
1165 /// w.write(vec![1; 4096]).await?;
1166 /// w.close().await?;
1167 /// # Ok(())
1168 /// # }
1169 /// ```
1170 pub fn if_match(mut self, s: &str) -> Self {
1171 self.args.if_match = Some(s.to_string());
1172 self
1173 }
1174
1175 /// Sets If-None-Match header for this write request.
1176 ///
1177 /// Refer to [`options::WriteOptions::if_none_match`] for more details.
1178 ///
1179 /// ### Example
1180 ///
1181 /// ```
1182 /// # use opendal::Result;
1183 /// # use opendal::Operator;
1184 /// # use futures::StreamExt;
1185 /// # use futures::SinkExt;
1186 /// use bytes::Bytes;
1187 ///
1188 /// # async fn test(op: Operator) -> Result<()> {
1189 /// let mut w = op
1190 /// .writer_with("path/to/file")
1191 /// .if_none_match("\"686897696a7c876b7e\"")
1192 /// .await?;
1193 /// w.write(vec![0; 4096]).await?;
1194 /// w.write(vec![1; 4096]).await?;
1195 /// w.close().await?;
1196 /// # Ok(())
1197 /// # }
1198 /// ```
1199 pub fn if_none_match(mut self, s: &str) -> Self {
1200 self.args.if_none_match = Some(s.to_string());
1201 self
1202 }
1203
1204 /// Sets the condition that write operation will succeed only if target does not exist.
1205 ///
1206 /// Refer to [`options::WriteOptions::if_not_exists`] for more details.
1207 ///
1208 /// ### Example
1209 ///
1210 /// ```
1211 /// # use opendal::Result;
1212 /// # use opendal::Operator;
1213 /// # use futures::StreamExt;
1214 /// # use futures::SinkExt;
1215 /// use bytes::Bytes;
1216 ///
1217 /// # async fn test(op: Operator) -> Result<()> {
1218 /// let mut w = op.writer_with("path/to/file").if_not_exists(true).await?;
1219 /// w.write(vec![0; 4096]).await?;
1220 /// w.write(vec![1; 4096]).await?;
1221 /// w.close().await?;
1222 /// # Ok(())
1223 /// # }
1224 /// ```
1225 pub fn if_not_exists(mut self, b: bool) -> Self {
1226 self.args.if_not_exists = b;
1227 self
1228 }
1229
1230 /// Sets user metadata for this write request.
1231 ///
1232 /// Refer to [`options::WriteOptions::user_metadata`] for more details.
1233 ///
1234 /// ### Example
1235 ///
1236 /// ```
1237 /// # use opendal::Result;
1238 /// # use opendal::Operator;
1239 /// # use futures::StreamExt;
1240 /// # use futures::SinkExt;
1241 /// use bytes::Bytes;
1242 ///
1243 /// # async fn test(op: Operator) -> Result<()> {
1244 /// let mut w = op
1245 /// .writer_with("path/to/file")
1246 /// .user_metadata([
1247 /// ("content-type".to_string(), "text/plain".to_string()),
1248 /// ("author".to_string(), "OpenDAL".to_string()),
1249 /// ])
1250 /// .await?;
1251 /// w.write(vec![0; 4096]).await?;
1252 /// w.close().await?;
1253 /// # Ok(())
1254 /// # }
1255 /// ```
1256 pub fn user_metadata(mut self, data: impl IntoIterator<Item = (String, String)>) -> Self {
1257 self.args.user_metadata = Some(HashMap::from_iter(data));
1258 self
1259 }
1260}
1261
1262/// Future that generated by [`Operator::delete_with`].
1263///
1264/// Users can add more options by public functions provided by this struct.
1265pub type FutureDelete<F> = OperatorFuture<options::DeleteOptions, (), F>;
1266
1267impl<F: Future<Output = Result<()>>> FutureDelete<F> {
1268 /// Change the version of this delete operation.
1269 pub fn version(mut self, v: &str) -> Self {
1270 self.args.version = Some(v.to_string());
1271 self
1272 }
1273}
1274
1275/// Future that generated by [`Operator::deleter_with`].
1276///
1277/// Users can add more options by public functions provided by this struct.
1278pub type FutureDeleter<F> = OperatorFuture<OpDeleter, (), F>;
1279
1280/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
1281///
1282/// Users can add more options by public functions provided by this struct.
1283pub type FutureList<F> = OperatorFuture<options::ListOptions, Vec<Entry>, F>;
1284
1285impl<F: Future<Output = Result<Vec<Entry>>>> FutureList<F> {
1286 /// The limit passed to underlying service to specify the max results
1287 /// that could return per-request.
1288 ///
1289 /// Users could use this to control the memory usage of list operation.
1290 pub fn limit(mut self, v: usize) -> Self {
1291 self.args.limit = Some(v);
1292 self
1293 }
1294
1295 /// The start_after passes to underlying service to specify the specified key
1296 /// to start listing from.
1297 pub fn start_after(mut self, v: &str) -> Self {
1298 self.args.start_after = Some(v.to_string());
1299 self
1300 }
1301
1302 /// The recursive is used to control whether the list operation is recursive.
1303 ///
1304 /// - If `false`, list operation will only list the entries under the given path.
1305 /// - If `true`, list operation will list all entries that starts with given path.
1306 ///
1307 /// Default to `false`.
1308 pub fn recursive(mut self, v: bool) -> Self {
1309 self.args.recursive = v;
1310 self
1311 }
1312
1313 /// Controls whether the `list` operation should return file versions.
1314 ///
1315 /// This function allows you to specify if the `list` operation, when executed, should include
1316 /// information about different versions of files, if versioning is supported and enabled.
1317 ///
1318 /// If `true`, subsequent `list` operations will include version information for each file.
1319 /// If `false`, version information will be omitted from the `list` results.
1320 ///
1321 /// Default to `false`
1322 pub fn versions(mut self, v: bool) -> Self {
1323 self.args.versions = v;
1324 self
1325 }
1326
1327 /// Controls whether the `list` operation should include deleted files (or versions).
1328 ///
1329 /// This function allows you to specify if the `list` operation, when executed, should include
1330 /// entries for files or versions that have been marked as deleted. This is particularly relevant
1331 /// in object storage systems that support soft deletion or versioning.
1332 ///
1333 /// If `true`, subsequent `list` operations will include deleted files or versions.
1334 /// If `false`, deleted files or versions will be excluded from the `list` results.
1335 pub fn deleted(mut self, v: bool) -> Self {
1336 self.args.deleted = v;
1337 self
1338 }
1339}
1340
1341/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
1342///
1343/// Users can add more options by public functions provided by this struct.
1344pub type FutureLister<F> = OperatorFuture<options::ListOptions, Lister, F>;
1345
1346impl<F: Future<Output = Result<Lister>>> FutureLister<F> {
1347 /// The limit passed to underlying service to specify the max results
1348 /// that could return per-request.
1349 ///
1350 /// Users could use this to control the memory usage of list operation.
1351 pub fn limit(mut self, v: usize) -> Self {
1352 self.args.limit = Some(v);
1353 self
1354 }
1355
1356 /// The start_after passes to underlying service to specify the specified key
1357 /// to start listing from.
1358 pub fn start_after(mut self, v: &str) -> Self {
1359 self.args.start_after = Some(v.to_string());
1360 self
1361 }
1362
1363 /// The recursive is used to control whether the list operation is recursive.
1364 ///
1365 /// - If `false`, list operation will only list the entries under the given path.
1366 /// - If `true`, list operation will list all entries that starts with given path.
1367 ///
1368 /// Default to `false`.
1369 pub fn recursive(mut self, v: bool) -> Self {
1370 self.args.recursive = v;
1371 self
1372 }
1373
1374 /// Controls whether the `list` operation should return file versions.
1375 ///
1376 /// This function allows you to specify if the `list` operation, when executed, should include
1377 /// information about different versions of files, if versioning is supported and enabled.
1378 ///
1379 /// If `true`, subsequent `list` operations will include version information for each file.
1380 /// If `false`, version information will be omitted from the `list` results.
1381 ///
1382 /// Default to `false`
1383 pub fn versions(mut self, v: bool) -> Self {
1384 self.args.versions = v;
1385 self
1386 }
1387
1388 /// Controls whether the `list` operation should include deleted files (or versions).
1389 ///
1390 /// This function allows you to specify if the `list` operation, when executed, should include
1391 /// entries for files or versions that have been marked as deleted. This is particularly relevant
1392 /// in object storage systems that support soft deletion or versioning.
1393 ///
1394 /// If `true`, subsequent `list` operations will include deleted files or versions.
1395 /// If `false`, deleted files or versions will be excluded from the `list` results.
1396 pub fn deleted(mut self, v: bool) -> Self {
1397 self.args.deleted = v;
1398 self
1399 }
1400}