opendal/blocking/
operator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::runtime::Handle;
19
20use crate::Operator as AsyncOperator;
21use crate::*;
22
23/// Use OpenDAL in blocking context.
24///
25/// # Notes
26///
27/// blocking::Operator is a wrapper around [`AsyncOperator`]. It calls async runtimes' `block_on` API to spawn blocking tasks.
28/// Please avoid using blocking::Operator in async context.
29///
30/// # Examples
31///
32/// ## Init in async context
33///
34/// blocking::Operator will use current async context's runtime to handle the async calls.
35///
36/// This is just for initialization. You must use `blocking::Operator` in blocking context.
37///
38/// ```rust,no_run
39/// # use opendal::services;
40/// # use opendal::blocking;
41/// # use opendal::Operator;
42/// # use opendal::Result;
43///
44/// #[tokio::main]
45/// async fn main() -> Result<()> {
46///     // Create fs backend builder.
47///     let mut builder = services::S3::default().bucket("test").region("us-east-1");
48///     let op = Operator::new(builder)?.finish();
49///
50///     // Build an `blocking::Operator` with blocking layer to start operating the storage.
51///     let _: blocking::Operator = blocking::Operator::new(op)?;
52///
53///     Ok(())
54/// }
55/// ```
56///
57/// ## In async context with blocking functions
58///
59/// If `blocking::Operator` is called in blocking function, please fetch a [`tokio::runtime::EnterGuard`]
60/// first. You can use [`Handle::try_current`] first to get the handle and then call [`Handle::enter`].
61/// This often happens in the case that async function calls blocking function.
62///
63/// ```rust,no_run
64/// # use opendal::services;
65/// # use opendal::blocking;
66/// # use opendal::Operator;
67/// # use opendal::Result;
68///
69/// #[tokio::main]
70/// async fn main() -> Result<()> {
71///     let _ = blocking_fn()?;
72///     Ok(())
73/// }
74///
75/// fn blocking_fn() -> Result<blocking::Operator> {
76///     // Create fs backend builder.
77///     let mut builder = services::S3::default().bucket("test").region("us-east-1");
78///     let op = Operator::new(builder)?.finish();
79///
80///     let handle = tokio::runtime::Handle::try_current().unwrap();
81///     let _guard = handle.enter();
82///     // Build an `blocking::Operator` to start operating the storage.
83///     let op: blocking::Operator = blocking::Operator::new(op)?;
84///     Ok(op)
85/// }
86/// ```
87///
88/// ## In blocking context
89///
90/// In a pure blocking context, we can create a runtime and use it to create the `blocking::Operator`.
91///
92/// > The following code uses a global statically created runtime as an example, please manage the
93/// > runtime on demand.
94///
95/// ```rust,no_run
96/// # use std::sync::LazyLock;
97/// # use opendal::services;
98/// # use opendal::blocking;
99/// # use opendal::Operator;
100/// # use opendal::Result;
101///
102/// static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
103///     tokio::runtime::Builder::new_multi_thread()
104///         .enable_all()
105///         .build()
106///         .unwrap()
107/// });
108///
109/// fn main() -> Result<()> {
110///     // Create fs backend builder.
111///     let mut builder = services::S3::default().bucket("test").region("us-east-1");
112///     let op = Operator::new(builder)?.finish();
113///
114///     // Fetch the `EnterGuard` from global runtime.
115///     let _guard = RUNTIME.enter();
116///     // Build an `blocking::Operator` with blocking layer to start operating the storage.
117///     let _: blocking::Operator = blocking::Operator::new(op)?;
118///
119///     Ok(())
120/// }
121/// ```
122#[derive(Clone, Debug)]
123pub struct Operator {
124    handle: tokio::runtime::Handle,
125    op: AsyncOperator,
126}
127
128impl Operator {
129    /// Create a new `BlockingLayer` with the current runtime's handle
130    pub fn new(op: AsyncOperator) -> Result<Self> {
131        Ok(Self {
132            handle: Handle::try_current()
133                .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?,
134            op,
135        })
136    }
137
138    /// Get information of underlying accessor.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// # use std::sync::Arc;
144    /// use opendal::blocking;
145    /// # use anyhow::Result;
146    /// use opendal::blocking::Operator;
147    ///
148    /// # fn test(op: blocking::Operator) -> Result<()> {
149    /// let info = op.info();
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub fn info(&self) -> OperatorInfo {
154        self.op.info()
155    }
156}
157
158/// # Operator blocking API.
159impl Operator {
160    /// Get given path's metadata.
161    ///
162    /// # Behavior
163    ///
164    /// ## Services that support `create_dir`
165    ///
166    /// `test` and `test/` may vary in some services such as S3. However, on a local file system,
167    /// they're identical. Therefore, the behavior of `stat("test")` and `stat("test/")` might differ
168    /// in certain edge cases. Always use `stat("test/")` when you need to access a directory if possible.
169    ///
170    /// Here are the behavior list:
171    ///
172    /// | Case                   | Path            | Result                                     |
173    /// |------------------------|-----------------|--------------------------------------------|
174    /// | stat existing dir      | `abc/`          | Metadata with dir mode                     |
175    /// | stat existing file     | `abc/def_file`  | Metadata with file mode                    |
176    /// | stat dir without `/`   | `abc/def_dir`   | Error `NotFound` or metadata with dir mode |
177    /// | stat file with `/`     | `abc/def_file/` | Error `NotFound`                           |
178    /// | stat not existing path | `xyz`           | Error `NotFound`                           |
179    ///
180    /// Refer to [RFC: List Prefix][crate::docs::rfcs::rfc_3243_list_prefix] for more details.
181    ///
182    /// ## Services that not support `create_dir`
183    ///
184    /// For services that not support `create_dir`, `stat("test/")` will return `NotFound` even
185    /// when `test/abc` exists since the service won't have the concept of dir. There is nothing
186    /// we can do about this.
187    ///
188    /// # Examples
189    ///
190    /// ## Check if file exists
191    ///
192    /// ```
193    /// # use anyhow::Result;
194    /// # use futures::io;
195    /// use opendal::blocking;
196    /// # use opendal::blocking::Operator;
197    /// use opendal::ErrorKind;
198    /// #
199    /// # fn test(op: blocking::Operator) -> Result<()> {
200    /// if let Err(e) = op.stat("test") {
201    ///     if e.kind() == ErrorKind::NotFound {
202    ///         println!("file not exist")
203    ///     }
204    /// }
205    /// # Ok(())
206    /// # }
207    /// ```
208    pub fn stat(&self, path: &str) -> Result<Metadata> {
209        self.stat_options(path, options::StatOptions::default())
210    }
211
212    /// Get given path's metadata with extra options.
213    ///
214    /// # Behavior
215    ///
216    /// ## Services that support `create_dir`
217    ///
218    /// `test` and `test/` may vary in some services such as S3. However, on a local file system,
219    /// they're identical. Therefore, the behavior of `stat("test")` and `stat("test/")` might differ
220    /// in certain edge cases. Always use `stat("test/")` when you need to access a directory if possible.
221    ///
222    /// Here are the behavior list:
223    ///
224    /// | Case                   | Path            | Result                                     |
225    /// |------------------------|-----------------|--------------------------------------------|
226    /// | stat existing dir      | `abc/`          | Metadata with dir mode                     |
227    /// | stat existing file     | `abc/def_file`  | Metadata with file mode                    |
228    /// | stat dir without `/`   | `abc/def_dir`   | Error `NotFound` or metadata with dir mode |
229    /// | stat file with `/`     | `abc/def_file/` | Error `NotFound`                           |
230    /// | stat not existing path | `xyz`           | Error `NotFound`                           |
231    ///
232    /// Refer to [RFC: List Prefix][crate::docs::rfcs::rfc_3243_list_prefix] for more details.
233    ///
234    /// ## Services that not support `create_dir`
235    ///
236    /// For services that not support `create_dir`, `stat("test/")` will return `NotFound` even
237    /// when `test/abc` exists since the service won't have the concept of dir. There is nothing
238    /// we can do about this.
239    pub fn stat_options(&self, path: &str, opts: options::StatOptions) -> Result<Metadata> {
240        self.handle.block_on(self.op.stat_options(path, opts))
241    }
242
243    /// Check if this path exists or not.
244    ///
245    /// # Example
246    ///
247    /// ```no_run
248    /// use anyhow::Result;
249    /// use opendal::blocking;
250    /// use opendal::blocking::Operator;
251    /// fn test(op: blocking::Operator) -> Result<()> {
252    ///     let _ = op.exists("test")?;
253    ///
254    ///     Ok(())
255    /// }
256    /// ```
257    pub fn exists(&self, path: &str) -> Result<bool> {
258        let r = self.stat(path);
259        match r {
260            Ok(_) => Ok(true),
261            Err(err) => match err.kind() {
262                ErrorKind::NotFound => Ok(false),
263                _ => Err(err),
264            },
265        }
266    }
267
268    /// Create a dir at given path.
269    ///
270    /// # Notes
271    ///
272    /// To indicate that a path is a directory, it is compulsory to include
273    /// a trailing / in the path. Failure to do so may result in
274    /// `NotADirectory` error being returned by OpenDAL.
275    ///
276    /// # Behavior
277    ///
278    /// - Create on existing dir will succeed.
279    /// - Create dir is always recursive, works like `mkdir -p`
280    ///
281    /// # Examples
282    ///
283    /// ```no_run
284    /// # use opendal::Result;
285    /// use opendal::blocking;
286    /// # use opendal::blocking::Operator;
287    /// # use futures::TryStreamExt;
288    /// # fn test(op: blocking::Operator) -> Result<()> {
289    /// op.create_dir("path/to/dir/")?;
290    /// # Ok(())
291    /// # }
292    /// ```
293    pub fn create_dir(&self, path: &str) -> Result<()> {
294        self.handle.block_on(self.op.create_dir(path))
295    }
296
297    /// Read the whole path into a bytes.
298    ///
299    /// This function will allocate a new bytes internally. For more precise memory control or
300    /// reading data lazily, please use [`blocking::Operator::reader`]
301    ///
302    /// # Examples
303    ///
304    /// ```no_run
305    /// # use opendal::Result;
306    /// use opendal::blocking;
307    /// # use opendal::blocking::Operator;
308    /// #
309    /// # fn test(op: blocking::Operator) -> Result<()> {
310    /// let bs = op.read("path/to/file")?;
311    /// # Ok(())
312    /// # }
313    /// ```
314    pub fn read(&self, path: &str) -> Result<Buffer> {
315        self.read_options(path, options::ReadOptions::default())
316    }
317
318    /// Read the whole path into a bytes with extra options.
319    ///
320    /// This function will allocate a new bytes internally. For more precise memory control or
321    /// reading data lazily, please use [`blocking::Operator::reader`]
322    pub fn read_options(&self, path: &str, opts: options::ReadOptions) -> Result<Buffer> {
323        self.handle.block_on(self.op.read_options(path, opts))
324    }
325
326    /// Create a new reader which can read the whole path.
327    ///
328    /// # Examples
329    ///
330    /// ```no_run
331    /// # use opendal::Result;
332    /// use opendal::blocking;
333    /// # use opendal::blocking::Operator;
334    /// # use futures::TryStreamExt;
335    /// # fn test(op: blocking::Operator) -> Result<()> {
336    /// let r = op.reader("path/to/file")?;
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub fn reader(&self, path: &str) -> Result<blocking::Reader> {
341        self.reader_options(path, options::ReaderOptions::default())
342    }
343
344    /// Create a new reader with extra options
345    pub fn reader_options(
346        &self,
347        path: &str,
348        opts: options::ReaderOptions,
349    ) -> Result<blocking::Reader> {
350        let r = self.handle.block_on(self.op.reader_options(path, opts))?;
351        Ok(blocking::Reader::new(self.handle.clone(), r))
352    }
353
354    /// Write bytes into given path.
355    ///
356    /// # Notes
357    ///
358    /// - Write will make sure all bytes has been written, or an error will be returned.
359    ///
360    /// # Examples
361    ///
362    /// ```no_run
363    /// # use opendal::Result;
364    /// use opendal::blocking;
365    /// # use opendal::blocking::Operator;
366    /// # use futures::StreamExt;
367    /// # use futures::SinkExt;
368    /// use bytes::Bytes;
369    ///
370    /// # fn test(op: blocking::Operator) -> Result<()> {
371    /// op.write("path/to/file", vec![0; 4096])?;
372    /// # Ok(())
373    /// # }
374    /// ```
375    pub fn write(&self, path: &str, bs: impl Into<Buffer>) -> Result<Metadata> {
376        self.write_options(path, bs, options::WriteOptions::default())
377    }
378
379    /// Write data with options.
380    ///
381    /// # Notes
382    ///
383    /// - Write will make sure all bytes has been written, or an error will be returned.
384    pub fn write_options(
385        &self,
386        path: &str,
387        bs: impl Into<Buffer>,
388        opts: options::WriteOptions,
389    ) -> Result<Metadata> {
390        self.handle.block_on(self.op.write_options(path, bs, opts))
391    }
392
393    /// Write multiple bytes into given path.
394    ///
395    /// # Notes
396    ///
397    /// - Write will make sure all bytes has been written, or an error will be returned.
398    ///
399    /// # Examples
400    ///
401    /// ```no_run
402    /// # use opendal::Result;
403    /// # use opendal::blocking;
404    /// # use opendal::blocking::Operator;
405    /// # use futures::StreamExt;
406    /// # use futures::SinkExt;
407    /// use bytes::Bytes;
408    ///
409    /// # fn test(op: blocking::Operator) -> Result<()> {
410    /// let mut w = op.writer("path/to/file")?;
411    /// w.write(vec![0; 4096])?;
412    /// w.write(vec![1; 4096])?;
413    /// w.close()?;
414    /// # Ok(())
415    /// # }
416    /// ```
417    pub fn writer(&self, path: &str) -> Result<blocking::Writer> {
418        self.writer_options(path, options::WriteOptions::default())
419    }
420
421    /// Create a new writer with extra options
422    pub fn writer_options(
423        &self,
424        path: &str,
425        opts: options::WriteOptions,
426    ) -> Result<blocking::Writer> {
427        let w = self.handle.block_on(self.op.writer_options(path, opts))?;
428        Ok(blocking::Writer::new(self.handle.clone(), w))
429    }
430
431    /// Copy a file from `from` to `to`.
432    ///
433    /// # Notes
434    ///
435    /// - `from` and `to` must be a file.
436    /// - `to` will be overwritten if it exists.
437    /// - If `from` and `to` are the same, nothing will happen.
438    /// - `copy` is idempotent. For same `from` and `to` input, the result will be the same.
439    ///
440    /// # Examples
441    ///
442    /// ```
443    /// # use opendal::Result;
444    /// use opendal::blocking;
445    /// # use opendal::blocking::Operator;
446    ///
447    /// # fn test(op: blocking::Operator) -> Result<()> {
448    /// op.copy("path/to/file", "path/to/file2")?;
449    /// # Ok(())
450    /// # }
451    /// ```
452    pub fn copy(&self, from: &str, to: &str) -> Result<()> {
453        self.handle.block_on(self.op.copy(from, to))
454    }
455
456    /// Rename a file from `from` to `to`.
457    ///
458    /// # Notes
459    ///
460    /// - `from` and `to` must be a file.
461    /// - `to` will be overwritten if it exists.
462    /// - If `from` and `to` are the same, a `IsSameFile` error will occur.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// # use opendal::Result;
468    /// use opendal::blocking;
469    /// # use opendal::blocking::Operator;
470    ///
471    /// # fn test(op: blocking::Operator) -> Result<()> {
472    /// op.rename("path/to/file", "path/to/file2")?;
473    /// # Ok(())
474    /// # }
475    /// ```
476    pub fn rename(&self, from: &str, to: &str) -> Result<()> {
477        self.handle.block_on(self.op.rename(from, to))
478    }
479
480    /// Delete given path.
481    ///
482    /// # Notes
483    ///
484    /// - Delete not existing error won't return errors.
485    ///
486    /// # Examples
487    ///
488    /// ```no_run
489    /// # use anyhow::Result;
490    /// # use futures::io;
491    /// use opendal::blocking;
492    /// # use opendal::blocking::Operator;
493    /// # fn test(op: blocking::Operator) -> Result<()> {
494    /// op.delete("path/to/file")?;
495    /// # Ok(())
496    /// # }
497    /// ```
498    pub fn delete(&self, path: &str) -> Result<()> {
499        self.delete_options(path, options::DeleteOptions::default())
500    }
501
502    /// Delete given path with options.
503    ///
504    /// # Notes
505    ///
506    /// - Delete not existing error won't return errors.
507    pub fn delete_options(&self, path: &str, opts: options::DeleteOptions) -> Result<()> {
508        self.handle.block_on(self.op.delete_options(path, opts))
509    }
510
511    /// Delete an infallible iterator of paths.
512    ///
513    /// Also see:
514    ///
515    /// - [`blocking::Operator::delete_try_iter`]: delete an fallible iterator of paths.
516    pub fn delete_iter<I, D>(&self, iter: I) -> Result<()>
517    where
518        I: IntoIterator<Item = D>,
519        D: IntoDeleteInput,
520    {
521        self.handle.block_on(self.op.delete_iter(iter))
522    }
523
524    /// Delete a fallible iterator of paths.
525    ///
526    /// Also see:
527    ///
528    /// - [`blocking::Operator::delete_iter`]: delete an infallible iterator of paths.
529    pub fn delete_try_iter<I, D>(&self, try_iter: I) -> Result<()>
530    where
531        I: IntoIterator<Item = Result<D>>,
532        D: IntoDeleteInput,
533    {
534        self.handle.block_on(self.op.delete_try_iter(try_iter))
535    }
536
537    /// Create a [`BlockingDeleter`] to continuously remove content from storage.
538    ///
539    /// It leverages batch deletion capabilities provided by storage services for efficient removal.
540    ///
541    /// Users can have more control over the deletion process by using [`BlockingDeleter`] directly.
542    pub fn deleter(&self) -> Result<blocking::Deleter> {
543        blocking::Deleter::create(
544            self.handle.clone(),
545            self.handle.block_on(self.op.deleter())?,
546        )
547    }
548
549    /// Remove the path and all nested dirs and files recursively.
550    ///
551    /// # Notes
552    ///
553    /// We don't support batch delete now.
554    ///
555    /// # Examples
556    ///
557    /// ```
558    /// # use anyhow::Result;
559    /// # use futures::io;
560    /// use opendal::blocking;
561    /// # use opendal::blocking::Operator;
562    /// # fn test(op: blocking::Operator) -> Result<()> {
563    /// op.remove_all("path/to/dir")?;
564    /// # Ok(())
565    /// # }
566    /// ```
567    pub fn remove_all(&self, path: &str) -> Result<()> {
568        self.handle.block_on(self.op.remove_all(path))
569    }
570
571    /// List entries that starts with given `path` in parent dir.
572    ///
573    /// # Notes
574    ///
575    /// ## Recursively List
576    ///
577    /// This function only read the children of the given directory. To read
578    /// all entries recursively, use `blocking::Operator::list_options("path", opts)`
579    /// instead.
580    ///
581    /// ## Streaming List
582    ///
583    /// This function will read all entries in the given directory. It could
584    /// take very long time and consume a lot of memory if the directory
585    /// contains a lot of entries.
586    ///
587    /// In order to avoid this, you can use [`blocking::Operator::lister`] to list entries in
588    /// a streaming way.
589    ///
590    /// # Examples
591    ///
592    /// ```no_run
593    /// # use anyhow::Result;
594    /// use opendal::blocking;
595    /// use opendal::blocking::Operator;
596    /// use opendal::EntryMode;
597    /// #  fn test(op: blocking::Operator) -> Result<()> {
598    /// let mut entries = op.list("path/to/dir/")?;
599    /// for entry in entries {
600    ///     match entry.metadata().mode() {
601    ///         EntryMode::FILE => {
602    ///             println!("Handling file")
603    ///         }
604    ///         EntryMode::DIR => {
605    ///             println!("Handling dir {}", entry.path())
606    ///         }
607    ///         EntryMode::Unknown => continue,
608    ///     }
609    /// }
610    /// # Ok(())
611    /// # }
612    /// ```
613    pub fn list(&self, path: &str) -> Result<Vec<Entry>> {
614        self.list_options(path, options::ListOptions::default())
615    }
616
617    /// List entries that starts with given `path` in parent dir. with options.
618    ///
619    /// # Notes
620    ///
621    /// ## Streaming List
622    ///
623    /// This function will read all entries in the given directory. It could
624    /// take very long time and consume a lot of memory if the directory
625    /// contains a lot of entries.
626    ///
627    /// In order to avoid this, you can use [`blocking::Operator::lister`] to list entries in
628    /// a streaming way.
629    pub fn list_options(&self, path: &str, opts: options::ListOptions) -> Result<Vec<Entry>> {
630        self.handle.block_on(self.op.list_options(path, opts))
631    }
632
633    /// List entries that starts with given `path` in parent dir.
634    ///
635    /// This function will create a new [`BlockingLister`] to list entries. Users can stop listing
636    /// via dropping this [`Lister`].
637    ///
638    /// # Notes
639    ///
640    /// ## Recursively List
641    ///
642    /// This function only read the children of the given directory. To read
643    /// all entries recursively, use [`blocking::Operator::lister_with`] and `delimiter("")`
644    /// instead.
645    ///
646    /// # Examples
647    ///
648    /// ```no_run
649    /// # use anyhow::Result;
650    /// # use futures::io;
651    /// use futures::TryStreamExt;
652    /// use opendal::blocking;
653    /// use opendal::blocking::Operator;
654    /// use opendal::EntryMode;
655    /// # fn test(op: blocking::Operator) -> Result<()> {
656    /// let mut ds = op.lister("path/to/dir/")?;
657    /// for de in ds {
658    ///     let de = de?;
659    ///     match de.metadata().mode() {
660    ///         EntryMode::FILE => {
661    ///             println!("Handling file")
662    ///         }
663    ///         EntryMode::DIR => {
664    ///             println!("Handling dir like start a new list via meta.path()")
665    ///         }
666    ///         EntryMode::Unknown => continue,
667    ///     }
668    /// }
669    /// # Ok(())
670    /// # }
671    /// ```
672    pub fn lister(&self, path: &str) -> Result<blocking::Lister> {
673        self.lister_options(path, options::ListOptions::default())
674    }
675
676    /// List entries within a given directory as an iterator with options.
677    ///
678    /// This function will create a new handle to list entries.
679    ///
680    /// An error will be returned if given path doesn't end with `/`.
681    pub fn lister_options(
682        &self,
683        path: &str,
684        opts: options::ListOptions,
685    ) -> Result<blocking::Lister> {
686        let l = self.handle.block_on(self.op.lister_options(path, opts))?;
687        Ok(blocking::Lister::new(self.handle.clone(), l))
688    }
689
690    /// Check if this operator can work correctly.
691    ///
692    /// We will send a `list` request to path and return any errors we met.
693    ///
694    /// ```
695    /// # use std::sync::Arc;
696    /// # use anyhow::Result;
697    /// use opendal::blocking;
698    /// use opendal::blocking::Operator;
699    /// use opendal::ErrorKind;
700    ///
701    /// # fn test(op: blocking::Operator) -> Result<()> {
702    /// op.check()?;
703    /// # Ok(())
704    /// # }
705    /// ```
706    pub fn check(&self) -> Result<()> {
707        let mut ds = self.lister("/")?;
708
709        match ds.next() {
710            Some(Err(e)) if e.kind() != ErrorKind::NotFound => Err(e),
711            _ => Ok(()),
712        }
713    }
714}
715
716impl From<Operator> for AsyncOperator {
717    fn from(val: Operator) -> Self {
718        val.op
719    }
720}