opendal/types/operator/
operator_futures.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Futures provides the futures generated by [`Operator`]
19//!
20//! By using futures, users can add more options for operation.
21
22use std::collections::HashMap;
23use std::future::IntoFuture;
24use std::ops::RangeBounds;
25use std::time::Duration;
26
27use chrono::DateTime;
28use chrono::Utc;
29use futures::Future;
30
31use crate::raw::*;
32use crate::*;
33
34/// OperatorFuture is the future generated by [`Operator`].
35///
36/// The future will consume all the input to generate a future.
37///
38/// # NOTES
39///
40/// This struct is by design to keep in crate. We don't want
41/// users to use this struct directly.
42pub struct OperatorFuture<I, O, F: Future<Output = Result<O>>> {
43    /// The accessor to the underlying object storage
44    acc: Accessor,
45    /// The path of string
46    path: String,
47    /// The input args
48    args: I,
49    /// The function which will move all the args and return a static future
50    f: fn(Accessor, String, I) -> F,
51}
52
53impl<I, O, F: Future<Output = Result<O>>> OperatorFuture<I, O, F> {
54    /// # NOTES
55    ///
56    /// This struct is by design to keep in crate. We don't want
57    /// users to use this struct directly.
58    pub(crate) fn new(
59        inner: Accessor,
60        path: String,
61        args: I,
62        f: fn(Accessor, String, I) -> F,
63    ) -> Self {
64        OperatorFuture {
65            acc: inner,
66            path,
67            args,
68            f,
69        }
70    }
71}
72
73impl<I, O, F> IntoFuture for OperatorFuture<I, O, F>
74where
75    F: Future<Output = Result<O>>,
76{
77    type Output = Result<O>;
78    type IntoFuture = F;
79
80    fn into_future(self) -> Self::IntoFuture {
81        (self.f)(self.acc, self.path, self.args)
82    }
83}
84
85/// Future that generated by [`Operator::stat_with`].
86///
87/// Users can add more options by public functions provided by this struct.
88pub type FutureStat<F> = OperatorFuture<options::StatOptions, Metadata, F>;
89
90impl<F: Future<Output = Result<Metadata>>> FutureStat<F> {
91    /// Set the If-Match for this operation.
92    ///
93    /// Refer to [`options::StatOptions::if_match`] for more details.
94    pub fn if_match(mut self, v: &str) -> Self {
95        self.args.if_match = Some(v.to_string());
96        self
97    }
98
99    /// Set the If-None-Match for this operation.
100    ///
101    /// Refer to [`options::StatOptions::if_none_match`] for more details.
102    pub fn if_none_match(mut self, v: &str) -> Self {
103        self.args.if_none_match = Some(v.to_string());
104        self
105    }
106
107    /// Set the If-Modified-Since for this operation.
108    ///
109    /// Refer to [`options::StatOptions::if_modified_since`] for more details.
110    pub fn if_modified_since(mut self, v: DateTime<Utc>) -> Self {
111        self.args.if_modified_since = Some(v);
112        self
113    }
114
115    /// Set the If-Unmodified-Since for this operation.
116    ///
117    /// Refer to [`options::StatOptions::if_unmodified_since`] for more details.
118    pub fn if_unmodified_since(mut self, v: DateTime<Utc>) -> Self {
119        self.args.if_unmodified_since = Some(v);
120        self
121    }
122
123    /// Set the version for this operation.
124    ///
125    /// Refer to [`options::StatOptions::version`] for more details.
126    pub fn version(mut self, v: &str) -> Self {
127        self.args.version = Some(v.to_string());
128        self
129    }
130}
131
132/// Future that generated by [`Operator::presign_stat_with`].
133///
134/// Users can add more options by public functions provided by this struct.
135pub type FuturePresignStat<F> =
136    OperatorFuture<(options::StatOptions, Duration), PresignedRequest, F>;
137
138impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignStat<F> {
139    /// Refer to [`options::StatOptions::override_content_disposition`] for more details.
140    pub fn override_content_disposition(mut self, v: &str) -> Self {
141        self.args.0.override_content_disposition = Some(v.to_string());
142        self
143    }
144
145    /// Refer to [`options::StatOptions::override_cache_control`] for more details.
146    pub fn override_cache_control(mut self, v: &str) -> Self {
147        self.args.0.override_cache_control = Some(v.to_string());
148        self
149    }
150
151    /// Refer to [`options::StatOptions::override_content_type`] for more details.
152    pub fn override_content_type(mut self, v: &str) -> Self {
153        self.args.0.override_content_type = Some(v.to_string());
154        self
155    }
156
157    /// Refer to [`options::StatOptions::if_match`] for more details.
158    pub fn if_match(mut self, v: &str) -> Self {
159        self.args.0.if_match = Some(v.to_string());
160        self
161    }
162
163    /// Refer to [`options::StatOptions::if_none_match`] for more details.
164    pub fn if_none_match(mut self, v: &str) -> Self {
165        self.args.0.if_none_match = Some(v.to_string());
166        self
167    }
168}
169
170/// Future that generated by [`Operator::presign_delete_with`].
171///
172/// Users can add more options by public functions provided by this struct.
173pub type FuturePresignDelete<F> =
174    OperatorFuture<(options::DeleteOptions, Duration), PresignedRequest, F>;
175
176impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignDelete<F> {}
177
178/// Future that generated by [`Operator::presign_read_with`].
179///
180/// Users can add more options by public functions provided by this struct.
181pub type FuturePresignRead<F> =
182    OperatorFuture<(options::ReadOptions, Duration), PresignedRequest, F>;
183
184impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignRead<F> {
185    /// Refer to [`options::ReadOptions::override_content_disposition`] for more details.
186    pub fn override_content_disposition(mut self, v: &str) -> Self {
187        self.args.0.override_content_disposition = Some(v.to_string());
188        self
189    }
190
191    /// Refer to [`options::ReadOptions::override_cache_control`] for more details.
192    pub fn override_cache_control(mut self, v: &str) -> Self {
193        self.args.0.override_cache_control = Some(v.to_string());
194        self
195    }
196
197    /// Refer to [`options::ReadOptions::override_content_type`] for more details.
198    pub fn override_content_type(mut self, v: &str) -> Self {
199        self.args.0.override_content_type = Some(v.to_string());
200        self
201    }
202
203    /// Refer to [`options::ReadOptions::if_match`] for more details.
204    pub fn if_match(mut self, v: &str) -> Self {
205        self.args.0.if_match = Some(v.to_string());
206        self
207    }
208
209    /// Refer to [`options::ReadOptions::if_none_match`] for more details.
210    pub fn if_none_match(mut self, v: &str) -> Self {
211        self.args.0.if_none_match = Some(v.to_string());
212        self
213    }
214}
215
216/// Future that generated by [`Operator::presign_write_with`].
217///
218/// Users can add more options by public functions provided by this struct.
219pub type FuturePresignWrite<F> =
220    OperatorFuture<(options::WriteOptions, Duration), PresignedRequest, F>;
221
222impl<F: Future<Output = Result<PresignedRequest>>> FuturePresignWrite<F> {
223    /// Refer to [`options::WriteOptions::content_type`] for more details.
224    pub fn content_type(mut self, v: &str) -> Self {
225        self.args.0.content_type = Some(v.to_string());
226        self
227    }
228
229    /// Refer to [`options::WriteOptions::content_disposition`] for more details.
230    pub fn content_disposition(mut self, v: &str) -> Self {
231        self.args.0.content_disposition = Some(v.to_string());
232        self
233    }
234
235    /// Refer to [`options::WriteOptions::content_encoding`] for more details.
236    pub fn content_encoding(mut self, v: &str) -> Self {
237        self.args.0.content_encoding = Some(v.to_string());
238        self
239    }
240
241    /// Refer to [`options::WriteOptions::cache_control`] for more details.
242    pub fn cache_control(mut self, v: &str) -> Self {
243        self.args.0.cache_control = Some(v.to_string());
244        self
245    }
246}
247
248/// Future that generated by [`Operator::read_with`].
249///
250/// Users can add more options by public functions provided by this struct.
251pub type FutureRead<F> = OperatorFuture<options::ReadOptions, Buffer, F>;
252
253impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
254    /// Set `range` for this `read` request.
255    ///
256    /// If we have a file with size `n`.
257    ///
258    /// - `..` means read bytes in range `[0, n)` of file.
259    /// - `0..1024` and `..1024` means read bytes in range `[0, 1024)` of file
260    /// - `1024..` means read bytes in range `[1024, n)` of file
261    ///
262    /// ```
263    /// # use opendal::Result;
264    /// # use opendal::Operator;
265    /// # use futures::TryStreamExt;
266    /// # async fn test(op: Operator) -> Result<()> {
267    /// let bs = op.read_with("path/to/file").range(0..1024).await?;
268    /// # Ok(())
269    /// # }
270    /// ```
271    pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
272        self.args.range = range.into();
273        self
274    }
275
276    /// Set `concurrent` for the reader.
277    ///
278    /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users
279    /// read large chunks of data. By setting `concurrent`, opendal will read files concurrently
280    /// on support storage services.
281    ///
282    /// By setting `concurrent`, opendal will fetch chunks concurrently with
283    /// the given chunk size.
284    ///
285    /// ```
286    /// # use opendal::Result;
287    /// # use opendal::Operator;
288    /// # use opendal::Scheme;
289    /// # async fn test(op: Operator) -> Result<()> {
290    /// let r = op.read_with("path/to/file").concurrent(8).await?;
291    /// # Ok(())
292    /// # }
293    /// ```
294    pub fn concurrent(mut self, concurrent: usize) -> Self {
295        self.args.concurrent = concurrent.max(1);
296        self
297    }
298
299    /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs.
300    ///
301    /// This following example will make opendal read data in 4MiB chunks:
302    ///
303    /// ```
304    /// # use opendal::Result;
305    /// # use opendal::Operator;
306    /// # use opendal::Scheme;
307    /// # async fn test(op: Operator) -> Result<()> {
308    /// let r = op.read_with("path/to/file").chunk(4 * 1024 * 1024).await?;
309    /// # Ok(())
310    /// # }
311    /// ```
312    pub fn chunk(mut self, chunk_size: usize) -> Self {
313        self.args.chunk = Some(chunk_size);
314        self
315    }
316
317    /// Set `version` for this `read` request.
318    ///
319    /// This feature can be used to retrieve the data of a specified version of the given path.
320    ///
321    /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned.
322    ///
323    /// ```
324    /// # use opendal::Result;
325    /// # use opendal::Operator;
326    ///
327    /// # async fn test(op: Operator, version: &str) -> Result<()> {
328    /// let mut bs = op.read_with("path/to/file").version(version).await?;
329    /// # Ok(())
330    /// # }
331    /// ```
332    pub fn version(mut self, v: &str) -> Self {
333        self.args.version = Some(v.to_string());
334        self
335    }
336
337    /// Set `if_match` for this `read` request.
338    ///
339    /// This feature can be used to check if the file's `ETag` matches the given `ETag`.
340    ///
341    /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`]
342    /// will be returned.
343    ///
344    /// ```
345    /// # use opendal::Result;
346    /// use opendal::Operator;
347    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
348    /// let mut metadata = op.read_with("path/to/file").if_match(etag).await?;
349    /// # Ok(())
350    /// # }
351    /// ```
352    pub fn if_match(mut self, v: &str) -> Self {
353        self.args.if_match = Some(v.to_string());
354        self
355    }
356
357    /// Set `if_none_match` for this `read` request.
358    ///
359    /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`.
360    ///
361    /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`]
362    /// will be returned.
363    ///
364    /// ```
365    /// # use opendal::Result;
366    /// use opendal::Operator;
367    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
368    /// let mut metadata = op.read_with("path/to/file").if_none_match(etag).await?;
369    /// # Ok(())
370    /// # }
371    /// ```
372    pub fn if_none_match(mut self, v: &str) -> Self {
373        self.args.if_none_match = Some(v.to_string());
374        self
375    }
376
377    /// ## `if_modified_since`
378    ///
379    /// Set `if_modified_since` for this `read` request.
380    ///
381    /// This feature can be used to check if the file has been modified since the given timestamp.
382    ///
383    /// If file exists and it hasn't been modified since the specified time, an error with kind
384    /// [`ErrorKind::ConditionNotMatch`] will be returned.
385    ///
386    /// ```
387    /// # use opendal::Result;
388    /// use chrono::DateTime;
389    /// use chrono::Utc;
390    /// use opendal::Operator;
391    /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
392    /// let mut metadata = op.read_with("path/to/file").if_modified_since(time).await?;
393    /// # Ok(())
394    /// # }
395    /// ```
396    pub fn if_modified_since(mut self, v: DateTime<Utc>) -> Self {
397        self.args.if_modified_since = Some(v);
398        self
399    }
400
401    /// Set `if_unmodified_since` for this `read` request.
402    ///
403    /// This feature can be used to check if the file hasn't been modified since the given timestamp.
404    ///
405    /// If file exists and it has been modified since the specified time, an error with kind
406    /// [`ErrorKind::ConditionNotMatch`] will be returned.
407    ///
408    /// ```
409    /// # use opendal::Result;
410    /// use chrono::DateTime;
411    /// use chrono::Utc;
412    /// use opendal::Operator;
413    /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
414    /// let mut metadata = op
415    ///     .read_with("path/to/file")
416    ///     .if_unmodified_since(time)
417    ///     .await?;
418    /// # Ok(())
419    /// # }
420    /// ```
421    pub fn if_unmodified_since(mut self, v: DateTime<Utc>) -> Self {
422        self.args.if_unmodified_since = Some(v);
423        self
424    }
425}
426
427/// Future that generated by [`Operator::read_with`] or [`Operator::reader_with`].
428///
429/// Users can add more options by public functions provided by this struct.
430///
431/// # Notes
432///
433/// `(OpRead, ())` is a trick to make sure `FutureReader` is different from `FutureRead`
434pub type FutureReader<F> = OperatorFuture<options::ReaderOptions, Reader, F>;
435
436impl<F: Future<Output = Result<Reader>>> FutureReader<F> {
437    /// Set `version` for this `reader`.
438    ///
439    /// This feature can be used to retrieve the data of a specified version of the given path.
440    ///
441    /// If the version doesn't exist, an error with kind [`ErrorKind::NotFound`] will be returned.
442    ///
443    /// ```
444    /// # use opendal::Result;
445    /// # use opendal::Operator;
446    ///
447    /// # async fn test(op: Operator, version: &str) -> Result<()> {
448    /// let mut r = op.reader_with("path/to/file").version(version).await?;
449    /// # Ok(())
450    /// # }
451    /// ```
452    pub fn version(mut self, v: &str) -> Self {
453        self.args.version = Some(v.to_string());
454        self
455    }
456
457    /// Set `concurrent` for the reader.
458    ///
459    /// OpenDAL by default to write file without concurrent. This is not efficient for cases when users
460    /// read large chunks of data. By setting `concurrent`, opendal will reading files concurrently
461    /// on support storage services.
462    ///
463    /// By setting `concurrent`, opendal will fetch chunks concurrently with
464    /// the give chunk size.
465    ///
466    /// ```
467    /// # use opendal::Result;
468    /// # use opendal::Operator;
469    /// # use opendal::Scheme;
470    /// # async fn test(op: Operator) -> Result<()> {
471    /// let r = op.reader_with("path/to/file").concurrent(8).await?;
472    /// # Ok(())
473    /// # }
474    /// ```
475    pub fn concurrent(mut self, concurrent: usize) -> Self {
476        self.args.concurrent = concurrent.max(1);
477        self
478    }
479
480    /// OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs.
481    ///
482    /// This following example will make opendal read data in 4MiB chunks:
483    ///
484    /// ```
485    /// # use opendal::Result;
486    /// # use opendal::Operator;
487    /// # use opendal::Scheme;
488    /// # async fn test(op: Operator) -> Result<()> {
489    /// let r = op
490    ///     .reader_with("path/to/file")
491    ///     .chunk(4 * 1024 * 1024)
492    ///     .await?;
493    /// # Ok(())
494    /// # }
495    /// ```
496    pub fn chunk(mut self, chunk_size: usize) -> Self {
497        self.args.chunk = Some(chunk_size);
498        self
499    }
500
501    /// Controls the optimization strategy for range reads in [`Reader::fetch`].
502    ///
503    /// When performing range reads, if the gap between two requested ranges is smaller than
504    /// the configured `gap` size, OpenDAL will merge these ranges into a single read request
505    /// and discard the unrequested data in between. This helps reduce the number of API calls
506    /// to remote storage services.
507    ///
508    /// This optimization is particularly useful when performing multiple small range reads
509    /// that are close to each other, as it reduces the overhead of multiple network requests
510    /// at the cost of transferring some additional data.
511    ///
512    /// In this example, if two requested ranges are separated by less than 1MiB,
513    /// they will be merged into a single read request:
514    ///
515    /// ```
516    /// # use opendal::Result;
517    /// # use opendal::Operator;
518    /// # use opendal::Scheme;
519    /// # async fn test(op: Operator) -> Result<()> {
520    /// let r = op
521    ///     .reader_with("path/to/file")
522    ///     .chunk(4 * 1024 * 1024)
523    ///     .gap(1024 * 1024) // 1MiB gap
524    ///     .await?;
525    /// # Ok(())
526    /// # }
527    /// ```
528    pub fn gap(mut self, gap_size: usize) -> Self {
529        self.args.gap = Some(gap_size);
530        self
531    }
532
533    /// Set `if-match` for this `read` request.
534    ///
535    /// This feature can be used to check if the file's `ETag` matches the given `ETag`.
536    ///
537    /// If file exists and it's etag doesn't match, an error with kind [`ErrorKind::ConditionNotMatch`]
538    /// will be returned.
539    ///
540    /// ```
541    /// # use opendal::Result;
542    /// use opendal::Operator;
543    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
544    /// let mut r = op.reader_with("path/to/file").if_match(etag).await?;
545    /// # Ok(())
546    /// # }
547    /// ```
548    pub fn if_match(mut self, etag: &str) -> Self {
549        self.args.if_match = Some(etag.to_string());
550        self
551    }
552
553    /// Set `if-none-match` for this `read` request.
554    ///
555    /// This feature can be used to check if the file's `ETag` doesn't match the given `ETag`.
556    ///
557    /// If file exists and it's etag match, an error with kind [`ErrorKind::ConditionNotMatch`]
558    /// will be returned.
559    ///
560    /// ```
561    /// # use opendal::Result;
562    /// use opendal::Operator;
563    /// # async fn test(op: Operator, etag: &str) -> Result<()> {
564    /// let mut r = op.reader_with("path/to/file").if_none_match(etag).await?;
565    /// # Ok(())
566    /// # }
567    /// ```
568    pub fn if_none_match(mut self, etag: &str) -> Self {
569        self.args.if_none_match = Some(etag.to_string());
570        self
571    }
572
573    /// Set `if-modified-since` for this `read` request.
574    ///
575    /// This feature can be used to check if the file has been modified since the given timestamp.
576    ///
577    /// If file exists and it hasn't been modified since the specified time, an error with kind
578    /// [`ErrorKind::ConditionNotMatch`] will be returned.
579    ///
580    /// ```
581    /// # use opendal::Result;
582    /// use chrono::DateTime;
583    /// use chrono::Utc;
584    /// use opendal::Operator;
585    /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
586    /// let mut r = op
587    ///     .reader_with("path/to/file")
588    ///     .if_modified_since(time)
589    ///     .await?;
590    /// # Ok(())
591    /// # }
592    /// ```
593    pub fn if_modified_since(mut self, v: DateTime<Utc>) -> Self {
594        self.args.if_modified_since = Some(v);
595        self
596    }
597
598    /// Set `if-unmodified-since` for this `read` request.
599    ///
600    /// This feature can be used to check if the file hasn't been modified since the given timestamp.
601    ///
602    /// If file exists and it has been modified since the specified time, an error with kind
603    /// [`ErrorKind::ConditionNotMatch`] will be returned.
604    ///
605    /// ```
606    /// # use opendal::Result;
607    /// use chrono::DateTime;
608    /// use chrono::Utc;
609    /// use opendal::Operator;
610    /// # async fn test(op: Operator, time: DateTime<Utc>) -> Result<()> {
611    /// let mut r = op
612    ///     .reader_with("path/to/file")
613    ///     .if_unmodified_since(time)
614    ///     .await?;
615    /// # Ok(())
616    /// # }
617    /// ```
618    pub fn if_unmodified_since(mut self, v: DateTime<Utc>) -> Self {
619        self.args.if_unmodified_since = Some(v);
620        self
621    }
622}
623
624/// Future that generated by [`Operator::write_with`].
625///
626/// Users can add more options by public functions provided by this struct.
627pub type FutureWrite<F> = OperatorFuture<(options::WriteOptions, Buffer), Metadata, F>;
628
629impl<F: Future<Output = Result<Metadata>>> FutureWrite<F> {
630    /// Sets append mode for this write request.
631    ///
632    /// Refer to [`options::WriteOptions::append`] for more details.
633    ///
634    /// ### Example
635    ///
636    /// ```
637    /// # use opendal::Result;
638    /// # use opendal::Operator;
639    /// # use futures::StreamExt;
640    /// # use futures::SinkExt;
641    /// use bytes::Bytes;
642    ///
643    /// # async fn test(op: Operator) -> Result<()> {
644    /// let _ = op
645    ///     .write_with("path/to/file", vec![0; 4096])
646    ///     .append(true)
647    ///     .await?;
648    /// # Ok(())
649    /// # }
650    /// ```
651    pub fn append(mut self, v: bool) -> Self {
652        self.args.0.append = v;
653        self
654    }
655
656    /// Sets chunk size for buffered writes.
657    ///
658    /// Refer to [`options::WriteOptions::chunk`] for more details.
659    ///
660    /// ### Example
661    ///
662    /// ```
663    /// # use opendal::Result;
664    /// # use opendal::Operator;
665    /// # use futures::StreamExt;
666    /// # use futures::SinkExt;
667    /// use bytes::Bytes;
668    ///
669    /// # async fn test(op: Operator) -> Result<()> {
670    /// // Set 8MiB chunk size - data will be sent in one API call at close
671    /// let _ = op
672    ///     .write_with("path/to/file", vec![0; 4096])
673    ///     .chunk(8 * 1024 * 1024)
674    ///     .await?;
675    /// # Ok(())
676    /// # }
677    /// ```
678    pub fn chunk(mut self, v: usize) -> Self {
679        self.args.0.chunk = Some(v);
680        self
681    }
682
683    /// Sets concurrent write operations for this writer.
684    ///
685    /// Refer to [`options::WriteOptions::concurrent`] for more details.
686    ///
687    /// ## Example
688    ///
689    /// ```
690    /// # use opendal::Result;
691    /// # use opendal::Operator;
692    /// # use futures::StreamExt;
693    /// # use futures::SinkExt;
694    /// use bytes::Bytes;
695    ///
696    /// # async fn test(op: Operator) -> Result<()> {
697    /// // Enable concurrent writes with 8 parallel operations at 128B chunk.
698    /// let _ = op
699    ///     .write_with("path/to/file", vec![0; 4096])
700    ///     .chunk(128)
701    ///     .concurrent(8)
702    ///     .await?;
703    /// # Ok(())
704    /// # }
705    /// ```
706    pub fn concurrent(mut self, v: usize) -> Self {
707        self.args.0.concurrent = v.max(1);
708        self
709    }
710
711    /// Sets Cache-Control header for this write operation.
712    ///
713    /// Refer to [`options::WriteOptions::cache_control`] for more details.
714    ///
715    /// ### Example
716    ///
717    /// ```
718    /// # use opendal::Result;
719    /// # use opendal::Operator;
720    /// # use futures::StreamExt;
721    /// # use futures::SinkExt;
722    /// use bytes::Bytes;
723    ///
724    /// # async fn test(op: Operator) -> Result<()> {
725    /// // Cache content for 7 days (604800 seconds)
726    /// let _ = op
727    ///     .write_with("path/to/file", vec![0; 4096])
728    ///     .cache_control("max-age=604800")
729    ///     .await?;
730    /// # Ok(())
731    /// # }
732    /// ```
733    pub fn cache_control(mut self, v: &str) -> Self {
734        self.args.0.cache_control = Some(v.to_string());
735        self
736    }
737
738    /// Sets `Content-Type` header for this write operation.
739    ///
740    /// Refer to [`options::WriteOptions::content_type`] for more details.
741    ///
742    /// ## Example
743    ///
744    /// ```
745    /// # use opendal::Result;
746    /// # use opendal::Operator;
747    /// use bytes::Bytes;
748    ///
749    /// # async fn test(op: Operator) -> Result<()> {
750    /// // Set content type for plain text file
751    /// let _ = op
752    ///     .write_with("path/to/file", vec![0; 4096])
753    ///     .content_type("text/plain")
754    ///     .await?;
755    /// # Ok(())
756    /// # }
757    /// ```
758    pub fn content_type(mut self, v: &str) -> Self {
759        self.args.0.content_type = Some(v.to_string());
760        self
761    }
762
763    /// Sets Content-Disposition header for this write request.
764    ///
765    /// Refer to [`options::WriteOptions::content_disposition`] for more details.
766    ///
767    /// ### Example
768    ///
769    /// ```
770    /// # use opendal::Result;
771    /// # use opendal::Operator;
772    /// # use futures::StreamExt;
773    /// # use futures::SinkExt;
774    /// use bytes::Bytes;
775    ///
776    /// # async fn test(op: Operator) -> Result<()> {
777    /// let _ = op
778    ///     .write_with("path/to/file", vec![0; 4096])
779    ///     .content_disposition("attachment; filename=\"filename.jpg\"")
780    ///     .await?;
781    /// # Ok(())
782    /// # }
783    /// ```
784    pub fn content_disposition(mut self, v: &str) -> Self {
785        self.args.0.content_disposition = Some(v.to_string());
786        self
787    }
788
789    /// Sets Content-Encoding header for this write request.
790    ///
791    /// Refer to [`options::WriteOptions::content_encoding`] for more details.
792    ///
793    /// ### Example
794    ///
795    /// ```
796    /// # use opendal::Result;
797    /// # use opendal::Operator;
798    /// # use futures::StreamExt;
799    /// # use futures::SinkExt;
800    /// use bytes::Bytes;
801    ///
802    /// # async fn test(op: Operator) -> Result<()> {
803    /// let _ = op
804    ///     .write_with("path/to/file", vec![0; 4096])
805    ///     .content_encoding("gzip")
806    ///     .await?;
807    /// # Ok(())
808    /// # }
809    /// ```
810    pub fn content_encoding(mut self, v: &str) -> Self {
811        self.args.0.content_encoding = Some(v.to_string());
812        self
813    }
814
815    /// Sets If-Match header for this write request.
816    ///
817    /// Refer to [`options::WriteOptions::if_match`] for more details.
818    ///
819    /// ### Example
820    ///
821    /// ```
822    /// # use opendal::Result;
823    /// # use opendal::Operator;
824    /// # use futures::StreamExt;
825    /// # use futures::SinkExt;
826    /// use bytes::Bytes;
827    ///
828    /// # async fn test(op: Operator) -> Result<()> {
829    /// let _ = op
830    ///     .write_with("path/to/file", vec![0; 4096])
831    ///     .if_match("\"686897696a7c876b7e\"")
832    ///     .await?;
833    /// # Ok(())
834    /// # }
835    /// ```
836    pub fn if_match(mut self, s: &str) -> Self {
837        self.args.0.if_match = Some(s.to_string());
838        self
839    }
840
841    /// Sets If-None-Match header for this write request.
842    ///
843    /// Refer to [`options::WriteOptions::if_none_match`] for more details.
844    ///
845    /// ### Example
846    ///
847    /// ```
848    /// # use opendal::Result;
849    /// # use opendal::Operator;
850    /// # use futures::StreamExt;
851    /// # use futures::SinkExt;
852    /// use bytes::Bytes;
853    ///
854    /// # async fn test(op: Operator) -> Result<()> {
855    /// let _ = op
856    ///     .write_with("path/to/file", vec![0; 4096])
857    ///     .if_none_match("\"686897696a7c876b7e\"")
858    ///     .await?;
859    /// # Ok(())
860    /// # }
861    /// ```
862    pub fn if_none_match(mut self, s: &str) -> Self {
863        self.args.0.if_none_match = Some(s.to_string());
864        self
865    }
866
867    /// Sets the condition that write operation will succeed only if target does not exist.
868    ///
869    /// Refer to [`options::WriteOptions::if_not_exists`] for more details.
870    ///
871    /// ### Example
872    ///
873    /// ```
874    /// # use opendal::Result;
875    /// # use opendal::Operator;
876    /// # use futures::StreamExt;
877    /// # use futures::SinkExt;
878    /// use bytes::Bytes;
879    ///
880    /// # async fn test(op: Operator) -> Result<()> {
881    /// let _ = op
882    ///     .write_with("path/to/file", vec![0; 4096])
883    ///     .if_not_exists(true)
884    ///     .await?;
885    /// # Ok(())
886    /// # }
887    /// ```
888    pub fn if_not_exists(mut self, b: bool) -> Self {
889        self.args.0.if_not_exists = b;
890        self
891    }
892
893    /// Sets user metadata for this write request.
894    ///
895    /// Refer to [`options::WriteOptions::user_metadata`] for more details.
896    ///
897    /// ### Example
898    ///
899    /// ```
900    /// # use opendal::Result;
901    /// # use opendal::Operator;
902    /// # use futures::StreamExt;
903    /// # use futures::SinkExt;
904    /// use bytes::Bytes;
905    ///
906    /// # async fn test(op: Operator) -> Result<()> {
907    /// let _ = op
908    ///     .write_with("path/to/file", vec![0; 4096])
909    ///     .user_metadata([
910    ///         ("language".to_string(), "rust".to_string()),
911    ///         ("author".to_string(), "OpenDAL".to_string()),
912    ///     ])
913    ///     .await?;
914    /// # Ok(())
915    /// # }
916    /// ```
917    pub fn user_metadata(mut self, data: impl IntoIterator<Item = (String, String)>) -> Self {
918        self.args.0.user_metadata = Some(HashMap::from_iter(data));
919        self
920    }
921}
922
923/// Future that generated by [`Operator::writer_with`].
924///
925/// Users can add more options by public functions provided by this struct.
926pub type FutureWriter<F> = OperatorFuture<options::WriteOptions, Writer, F>;
927
928impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
929    /// Sets append mode for this write request.
930    ///
931    /// Refer to [`options::WriteOptions::append`] for more details.
932    ///
933    /// ### Example
934    ///
935    /// ```
936    /// # use opendal::Result;
937    /// # use opendal::Operator;
938    /// # use futures::StreamExt;
939    /// # use futures::SinkExt;
940    /// use bytes::Bytes;
941    ///
942    /// # async fn test(op: Operator) -> Result<()> {
943    /// let mut w = op.writer_with("path/to/file").append(true).await?;
944    /// w.write(vec![0; 4096]).await?;
945    /// w.write(vec![1; 4096]).await?;
946    /// w.close().await?;
947    /// # Ok(())
948    /// # }
949    /// ```
950    pub fn append(mut self, v: bool) -> Self {
951        self.args.append = v;
952        self
953    }
954
955    /// Sets chunk size for buffered writes.
956    ///
957    /// Refer to [`options::WriteOptions::chunk`] for more details.
958    ///
959    /// ### Example
960    ///
961    /// ```
962    /// # use opendal::Result;
963    /// # use opendal::Operator;
964    /// # use futures::StreamExt;
965    /// # use futures::SinkExt;
966    /// use bytes::Bytes;
967    ///
968    /// # async fn test(op: Operator) -> Result<()> {
969    /// // Set 8MiB chunk size - data will be sent in one API call at close
970    /// let mut w = op
971    ///     .writer_with("path/to/file")
972    ///     .chunk(8 * 1024 * 1024)
973    ///     .await?;
974    /// w.write(vec![0; 4096]).await?;
975    /// w.write(vec![1; 4096]).await?;
976    /// w.close().await?;
977    /// # Ok(())
978    /// # }
979    /// ```
980    pub fn chunk(mut self, v: usize) -> Self {
981        self.args.chunk = Some(v);
982        self
983    }
984
985    /// Sets concurrent write operations for this writer.
986    ///
987    /// Refer to [`options::WriteOptions::concurrent`] for more details.
988    ///
989    /// ## Example
990    ///
991    /// ```
992    /// # use opendal::Result;
993    /// # use opendal::Operator;
994    /// # use futures::StreamExt;
995    /// # use futures::SinkExt;
996    /// use bytes::Bytes;
997    ///
998    /// # async fn test(op: Operator) -> Result<()> {
999    /// // Enable concurrent writes with 8 parallel operations
1000    /// let mut w = op.writer_with("path/to/file").concurrent(8).await?;
1001    ///
1002    /// // First write starts immediately
1003    /// w.write(vec![0; 4096]).await?;
1004    ///
1005    /// // Second write runs concurrently with first
1006    /// w.write(vec![1; 4096]).await?;
1007    ///
1008    /// // Ensures all writes complete successfully and in order
1009    /// w.close().await?;
1010    /// # Ok(())
1011    /// # }
1012    /// ```
1013    pub fn concurrent(mut self, v: usize) -> Self {
1014        self.args.concurrent = v.max(1);
1015        self
1016    }
1017
1018    /// Sets Cache-Control header for this write operation.
1019    ///
1020    /// Refer to [`options::WriteOptions::cache_control`] for more details.
1021    ///
1022    /// ### Example
1023    ///
1024    /// ```
1025    /// # use opendal::Result;
1026    /// # use opendal::Operator;
1027    /// # use futures::StreamExt;
1028    /// # use futures::SinkExt;
1029    /// use bytes::Bytes;
1030    ///
1031    /// # async fn test(op: Operator) -> Result<()> {
1032    /// // Cache content for 7 days (604800 seconds)
1033    /// let mut w = op
1034    ///     .writer_with("path/to/file")
1035    ///     .cache_control("max-age=604800")
1036    ///     .await?;
1037    /// w.write(vec![0; 4096]).await?;
1038    /// w.write(vec![1; 4096]).await?;
1039    /// w.close().await?;
1040    /// # Ok(())
1041    /// # }
1042    /// ```
1043    pub fn cache_control(mut self, v: &str) -> Self {
1044        self.args.cache_control = Some(v.to_string());
1045        self
1046    }
1047
1048    /// Sets `Content-Type` header for this write operation.
1049    ///
1050    /// Refer to [`options::WriteOptions::content_type`] for more details.
1051    ///
1052    /// ## Example
1053    ///
1054    /// ```
1055    /// # use opendal::Result;
1056    /// # use opendal::Operator;
1057    /// use bytes::Bytes;
1058    ///
1059    /// # async fn test(op: Operator) -> Result<()> {
1060    /// // Set content type for plain text file
1061    /// let mut w = op
1062    ///     .writer_with("path/to/file")
1063    ///     .content_type("text/plain")
1064    ///     .await?;
1065    /// w.write(vec![0; 4096]).await?;
1066    /// w.write(vec![1; 4096]).await?;
1067    /// w.close().await?;
1068    /// # Ok(())
1069    /// # }
1070    /// ```
1071    pub fn content_type(mut self, v: &str) -> Self {
1072        self.args.content_type = Some(v.to_string());
1073        self
1074    }
1075
1076    /// Sets Content-Disposition header for this write request.
1077    ///
1078    /// Refer to [`options::WriteOptions::content_disposition`] for more details.
1079    ///
1080    /// ### Example
1081    ///
1082    /// ```
1083    /// # use opendal::Result;
1084    /// # use opendal::Operator;
1085    /// # use futures::StreamExt;
1086    /// # use futures::SinkExt;
1087    /// use bytes::Bytes;
1088    ///
1089    /// # async fn test(op: Operator) -> Result<()> {
1090    /// let mut w = op
1091    ///     .writer_with("path/to/file")
1092    ///     .content_disposition("attachment; filename=\"filename.jpg\"")
1093    ///     .await?;
1094    /// w.write(vec![0; 4096]).await?;
1095    /// w.write(vec![1; 4096]).await?;
1096    /// w.close().await?;
1097    /// # Ok(())
1098    /// # }
1099    /// ```
1100    pub fn content_disposition(mut self, v: &str) -> Self {
1101        self.args.content_disposition = Some(v.to_string());
1102        self
1103    }
1104
1105    /// Sets Content-Encoding header for this write request.
1106    ///
1107    /// Refer to [`options::WriteOptions::content_encoding`] for more details.
1108    ///
1109    /// ### Example
1110    ///
1111    /// ```
1112    /// # use opendal::Result;
1113    /// # use opendal::Operator;
1114    /// # use futures::StreamExt;
1115    /// # use futures::SinkExt;
1116    /// use bytes::Bytes;
1117    ///
1118    /// # async fn test(op: Operator) -> Result<()> {
1119    /// let mut w = op
1120    ///     .writer_with("path/to/file")
1121    ///     .content_encoding("gzip")
1122    ///     .await?;
1123    /// w.write(vec![0; 4096]).await?;
1124    /// w.write(vec![1; 4096]).await?;
1125    /// w.close().await?;
1126    /// # Ok(())
1127    /// # }
1128    /// ```
1129    pub fn content_encoding(mut self, v: &str) -> Self {
1130        self.args.content_encoding = Some(v.to_string());
1131        self
1132    }
1133
1134    /// Sets If-Match header for this write request.
1135    ///
1136    /// Refer to [`options::WriteOptions::if_match`] for more details.
1137    ///
1138    /// ### Behavior
1139    ///
1140    /// - If supported, the write operation will only succeed if the target's ETag matches the specified value
1141    /// - The value should be a valid ETag string
1142    /// - Common values include:
1143    ///   - A specific ETag value like `"686897696a7c876b7e"`
1144    ///   - `*` - Matches any existing resource
1145    /// - If not supported, the value will be ignored
1146    ///
1147    /// This operation provides conditional write functionality based on ETag matching,
1148    /// helping prevent unintended overwrites in concurrent scenarios.
1149    ///
1150    /// ### Example
1151    ///
1152    /// ```
1153    /// # use opendal::Result;
1154    /// # use opendal::Operator;
1155    /// # use futures::StreamExt;
1156    /// # use futures::SinkExt;
1157    /// use bytes::Bytes;
1158    ///
1159    /// # async fn test(op: Operator) -> Result<()> {
1160    /// let mut w = op
1161    ///     .writer_with("path/to/file")
1162    ///     .if_match("\"686897696a7c876b7e\"")
1163    ///     .await?;
1164    /// w.write(vec![0; 4096]).await?;
1165    /// w.write(vec![1; 4096]).await?;
1166    /// w.close().await?;
1167    /// # Ok(())
1168    /// # }
1169    /// ```
1170    pub fn if_match(mut self, s: &str) -> Self {
1171        self.args.if_match = Some(s.to_string());
1172        self
1173    }
1174
1175    /// Sets If-None-Match header for this write request.
1176    ///
1177    /// Refer to [`options::WriteOptions::if_none_match`] for more details.
1178    ///
1179    /// ### Example
1180    ///
1181    /// ```
1182    /// # use opendal::Result;
1183    /// # use opendal::Operator;
1184    /// # use futures::StreamExt;
1185    /// # use futures::SinkExt;
1186    /// use bytes::Bytes;
1187    ///
1188    /// # async fn test(op: Operator) -> Result<()> {
1189    /// let mut w = op
1190    ///     .writer_with("path/to/file")
1191    ///     .if_none_match("\"686897696a7c876b7e\"")
1192    ///     .await?;
1193    /// w.write(vec![0; 4096]).await?;
1194    /// w.write(vec![1; 4096]).await?;
1195    /// w.close().await?;
1196    /// # Ok(())
1197    /// # }
1198    /// ```
1199    pub fn if_none_match(mut self, s: &str) -> Self {
1200        self.args.if_none_match = Some(s.to_string());
1201        self
1202    }
1203
1204    /// Sets the condition that write operation will succeed only if target does not exist.
1205    ///
1206    /// Refer to [`options::WriteOptions::if_not_exists`] for more details.
1207    ///
1208    /// ### Example
1209    ///
1210    /// ```
1211    /// # use opendal::Result;
1212    /// # use opendal::Operator;
1213    /// # use futures::StreamExt;
1214    /// # use futures::SinkExt;
1215    /// use bytes::Bytes;
1216    ///
1217    /// # async fn test(op: Operator) -> Result<()> {
1218    /// let mut w = op.writer_with("path/to/file").if_not_exists(true).await?;
1219    /// w.write(vec![0; 4096]).await?;
1220    /// w.write(vec![1; 4096]).await?;
1221    /// w.close().await?;
1222    /// # Ok(())
1223    /// # }
1224    /// ```
1225    pub fn if_not_exists(mut self, b: bool) -> Self {
1226        self.args.if_not_exists = b;
1227        self
1228    }
1229
1230    /// Sets user metadata for this write request.
1231    ///
1232    /// Refer to [`options::WriteOptions::user_metadata`] for more details.
1233    ///
1234    /// ### Example
1235    ///
1236    /// ```
1237    /// # use opendal::Result;
1238    /// # use opendal::Operator;
1239    /// # use futures::StreamExt;
1240    /// # use futures::SinkExt;
1241    /// use bytes::Bytes;
1242    ///
1243    /// # async fn test(op: Operator) -> Result<()> {
1244    /// let mut w = op
1245    ///     .writer_with("path/to/file")
1246    ///     .user_metadata([
1247    ///         ("content-type".to_string(), "text/plain".to_string()),
1248    ///         ("author".to_string(), "OpenDAL".to_string()),
1249    ///     ])
1250    ///     .await?;
1251    /// w.write(vec![0; 4096]).await?;
1252    /// w.close().await?;
1253    /// # Ok(())
1254    /// # }
1255    /// ```
1256    pub fn user_metadata(mut self, data: impl IntoIterator<Item = (String, String)>) -> Self {
1257        self.args.user_metadata = Some(HashMap::from_iter(data));
1258        self
1259    }
1260}
1261
1262/// Future that generated by [`Operator::delete_with`].
1263///
1264/// Users can add more options by public functions provided by this struct.
1265pub type FutureDelete<F> = OperatorFuture<options::DeleteOptions, (), F>;
1266
1267impl<F: Future<Output = Result<()>>> FutureDelete<F> {
1268    /// Change the version of this delete operation.
1269    pub fn version(mut self, v: &str) -> Self {
1270        self.args.version = Some(v.to_string());
1271        self
1272    }
1273}
1274
1275/// Future that generated by [`Operator::deleter_with`].
1276///
1277/// Users can add more options by public functions provided by this struct.
1278pub type FutureDeleter<F> = OperatorFuture<OpDeleter, (), F>;
1279
1280/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
1281///
1282/// Users can add more options by public functions provided by this struct.
1283pub type FutureList<F> = OperatorFuture<options::ListOptions, Vec<Entry>, F>;
1284
1285impl<F: Future<Output = Result<Vec<Entry>>>> FutureList<F> {
1286    /// The limit passed to underlying service to specify the max results
1287    /// that could return per-request.
1288    ///
1289    /// Users could use this to control the memory usage of list operation.
1290    pub fn limit(mut self, v: usize) -> Self {
1291        self.args.limit = Some(v);
1292        self
1293    }
1294
1295    /// The start_after passes to underlying service to specify the specified key
1296    /// to start listing from.
1297    pub fn start_after(mut self, v: &str) -> Self {
1298        self.args.start_after = Some(v.to_string());
1299        self
1300    }
1301
1302    /// The recursive is used to control whether the list operation is recursive.
1303    ///
1304    /// - If `false`, list operation will only list the entries under the given path.
1305    /// - If `true`, list operation will list all entries that starts with given path.
1306    ///
1307    /// Default to `false`.
1308    pub fn recursive(mut self, v: bool) -> Self {
1309        self.args.recursive = v;
1310        self
1311    }
1312
1313    /// Controls whether the `list` operation should return file versions.
1314    ///
1315    /// This function allows you to specify if the `list` operation, when executed, should include
1316    /// information about different versions of files, if versioning is supported and enabled.
1317    ///
1318    /// If `true`, subsequent `list` operations will include version information for each file.
1319    /// If `false`, version information will be omitted from the `list` results.
1320    ///
1321    /// Default to `false`
1322    pub fn versions(mut self, v: bool) -> Self {
1323        self.args.versions = v;
1324        self
1325    }
1326
1327    /// Controls whether the `list` operation should include deleted files (or versions).
1328    ///
1329    /// This function allows you to specify if the `list` operation, when executed, should include
1330    /// entries for files or versions that have been marked as deleted. This is particularly relevant
1331    /// in object storage systems that support soft deletion or versioning.
1332    ///
1333    /// If `true`, subsequent `list` operations will include deleted files or versions.
1334    /// If `false`, deleted files or versions will be excluded from the `list` results.
1335    pub fn deleted(mut self, v: bool) -> Self {
1336        self.args.deleted = v;
1337        self
1338    }
1339}
1340
1341/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
1342///
1343/// Users can add more options by public functions provided by this struct.
1344pub type FutureLister<F> = OperatorFuture<options::ListOptions, Lister, F>;
1345
1346impl<F: Future<Output = Result<Lister>>> FutureLister<F> {
1347    /// The limit passed to underlying service to specify the max results
1348    /// that could return per-request.
1349    ///
1350    /// Users could use this to control the memory usage of list operation.
1351    pub fn limit(mut self, v: usize) -> Self {
1352        self.args.limit = Some(v);
1353        self
1354    }
1355
1356    /// The start_after passes to underlying service to specify the specified key
1357    /// to start listing from.
1358    pub fn start_after(mut self, v: &str) -> Self {
1359        self.args.start_after = Some(v.to_string());
1360        self
1361    }
1362
1363    /// The recursive is used to control whether the list operation is recursive.
1364    ///
1365    /// - If `false`, list operation will only list the entries under the given path.
1366    /// - If `true`, list operation will list all entries that starts with given path.
1367    ///
1368    /// Default to `false`.
1369    pub fn recursive(mut self, v: bool) -> Self {
1370        self.args.recursive = v;
1371        self
1372    }
1373
1374    /// Controls whether the `list` operation should return file versions.
1375    ///
1376    /// This function allows you to specify if the `list` operation, when executed, should include
1377    /// information about different versions of files, if versioning is supported and enabled.
1378    ///
1379    /// If `true`, subsequent `list` operations will include version information for each file.
1380    /// If `false`, version information will be omitted from the `list` results.
1381    ///
1382    /// Default to `false`
1383    pub fn versions(mut self, v: bool) -> Self {
1384        self.args.versions = v;
1385        self
1386    }
1387
1388    /// Controls whether the `list` operation should include deleted files (or versions).
1389    ///
1390    /// This function allows you to specify if the `list` operation, when executed, should include
1391    /// entries for files or versions that have been marked as deleted. This is particularly relevant
1392    /// in object storage systems that support soft deletion or versioning.
1393    ///
1394    /// If `true`, subsequent `list` operations will include deleted files or versions.
1395    /// If `false`, deleted files or versions will be excluded from the `list` results.
1396    pub fn deleted(mut self, v: bool) -> Self {
1397        self.args.deleted = v;
1398        self
1399    }
1400}