opendal/blocking/operator.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::runtime::Handle;
19
20use crate::Operator as AsyncOperator;
21use crate::*;
22
23/// Use OpenDAL in blocking context.
24///
25/// # Notes
26///
27/// blocking::Operator is a wrapper around [`AsyncOperator`]. It calls async runtimes' `block_on` API to spawn blocking tasks.
28/// Please avoid using blocking::Operator in async context.
29///
30/// # Examples
31///
32/// ## Init in async context
33///
34/// blocking::Operator will use current async context's runtime to handle the async calls.
35///
36/// This is just for initialization. You must use `blocking::Operator` in blocking context.
37///
38/// ```rust,no_run
39/// # use opendal::services;
40/// # use opendal::blocking;
41/// # use opendal::Operator;
42/// # use opendal::Result;
43///
44/// #[tokio::main]
45/// async fn main() -> Result<()> {
46/// // Create fs backend builder.
47/// let mut builder = services::S3::default().bucket("test").region("us-east-1");
48/// let op = Operator::new(builder)?.finish();
49///
50/// // Build an `blocking::Operator` with blocking layer to start operating the storage.
51/// let _: blocking::Operator = blocking::Operator::new(op)?;
52///
53/// Ok(())
54/// }
55/// ```
56///
57/// ## In async context with blocking functions
58///
59/// If `blocking::Operator` is called in blocking function, please fetch a [`tokio::runtime::EnterGuard`]
60/// first. You can use [`Handle::try_current`] first to get the handle and then call [`Handle::enter`].
61/// This often happens in the case that async function calls blocking function.
62///
63/// ```rust,no_run
64/// # use opendal::services;
65/// # use opendal::blocking;
66/// # use opendal::Operator;
67/// # use opendal::Result;
68///
69/// #[tokio::main]
70/// async fn main() -> Result<()> {
71/// let _ = blocking_fn()?;
72/// Ok(())
73/// }
74///
75/// fn blocking_fn() -> Result<blocking::Operator> {
76/// // Create fs backend builder.
77/// let mut builder = services::S3::default().bucket("test").region("us-east-1");
78/// let op = Operator::new(builder)?.finish();
79///
80/// let handle = tokio::runtime::Handle::try_current().unwrap();
81/// let _guard = handle.enter();
82/// // Build an `blocking::Operator` to start operating the storage.
83/// let op: blocking::Operator = blocking::Operator::new(op)?;
84/// Ok(op)
85/// }
86/// ```
87///
88/// ## In blocking context
89///
90/// In a pure blocking context, we can create a runtime and use it to create the `blocking::Operator`.
91///
92/// > The following code uses a global statically created runtime as an example, please manage the
93/// > runtime on demand.
94///
95/// ```rust,no_run
96/// # use std::sync::LazyLock;
97/// # use opendal::services;
98/// # use opendal::blocking;
99/// # use opendal::Operator;
100/// # use opendal::Result;
101///
102/// static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
103/// tokio::runtime::Builder::new_multi_thread()
104/// .enable_all()
105/// .build()
106/// .unwrap()
107/// });
108///
109/// fn main() -> Result<()> {
110/// // Create fs backend builder.
111/// let mut builder = services::S3::default().bucket("test").region("us-east-1");
112/// let op = Operator::new(builder)?.finish();
113///
114/// // Fetch the `EnterGuard` from global runtime.
115/// let _guard = RUNTIME.enter();
116/// // Build an `blocking::Operator` with blocking layer to start operating the storage.
117/// let _: blocking::Operator = blocking::Operator::new(op)?;
118///
119/// Ok(())
120/// }
121/// ```
122#[derive(Clone, Debug)]
123pub struct Operator {
124 handle: tokio::runtime::Handle,
125 op: AsyncOperator,
126}
127
128impl Operator {
129 /// Create a new `BlockingLayer` with the current runtime's handle
130 pub fn new(op: AsyncOperator) -> Result<Self> {
131 Ok(Self {
132 handle: Handle::try_current()
133 .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?,
134 op,
135 })
136 }
137
138 /// Get information of underlying accessor.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// # use std::sync::Arc;
144 /// use opendal::blocking;
145 /// # use anyhow::Result;
146 /// use opendal::blocking::Operator;
147 ///
148 /// # fn test(op: blocking::Operator) -> Result<()> {
149 /// let info = op.info();
150 /// # Ok(())
151 /// # }
152 /// ```
153 pub fn info(&self) -> OperatorInfo {
154 self.op.info()
155 }
156}
157
158/// # Operator blocking API.
159impl Operator {
160 /// Get given path's metadata.
161 ///
162 /// # Behavior
163 ///
164 /// ## Services that support `create_dir`
165 ///
166 /// `test` and `test/` may vary in some services such as S3. However, on a local file system,
167 /// they're identical. Therefore, the behavior of `stat("test")` and `stat("test/")` might differ
168 /// in certain edge cases. Always use `stat("test/")` when you need to access a directory if possible.
169 ///
170 /// Here are the behavior list:
171 ///
172 /// | Case | Path | Result |
173 /// |------------------------|-----------------|--------------------------------------------|
174 /// | stat existing dir | `abc/` | Metadata with dir mode |
175 /// | stat existing file | `abc/def_file` | Metadata with file mode |
176 /// | stat dir without `/` | `abc/def_dir` | Error `NotFound` or metadata with dir mode |
177 /// | stat file with `/` | `abc/def_file/` | Error `NotFound` |
178 /// | stat not existing path | `xyz` | Error `NotFound` |
179 ///
180 /// Refer to [RFC: List Prefix][crate::docs::rfcs::rfc_3243_list_prefix] for more details.
181 ///
182 /// ## Services that not support `create_dir`
183 ///
184 /// For services that not support `create_dir`, `stat("test/")` will return `NotFound` even
185 /// when `test/abc` exists since the service won't have the concept of dir. There is nothing
186 /// we can do about this.
187 ///
188 /// # Examples
189 ///
190 /// ## Check if file exists
191 ///
192 /// ```
193 /// # use anyhow::Result;
194 /// # use futures::io;
195 /// use opendal::blocking;
196 /// # use opendal::blocking::Operator;
197 /// use opendal::ErrorKind;
198 /// #
199 /// # fn test(op: blocking::Operator) -> Result<()> {
200 /// if let Err(e) = op.stat("test") {
201 /// if e.kind() == ErrorKind::NotFound {
202 /// println!("file not exist")
203 /// }
204 /// }
205 /// # Ok(())
206 /// # }
207 /// ```
208 pub fn stat(&self, path: &str) -> Result<Metadata> {
209 self.stat_options(path, options::StatOptions::default())
210 }
211
212 /// Get given path's metadata with extra options.
213 ///
214 /// # Behavior
215 ///
216 /// ## Services that support `create_dir`
217 ///
218 /// `test` and `test/` may vary in some services such as S3. However, on a local file system,
219 /// they're identical. Therefore, the behavior of `stat("test")` and `stat("test/")` might differ
220 /// in certain edge cases. Always use `stat("test/")` when you need to access a directory if possible.
221 ///
222 /// Here are the behavior list:
223 ///
224 /// | Case | Path | Result |
225 /// |------------------------|-----------------|--------------------------------------------|
226 /// | stat existing dir | `abc/` | Metadata with dir mode |
227 /// | stat existing file | `abc/def_file` | Metadata with file mode |
228 /// | stat dir without `/` | `abc/def_dir` | Error `NotFound` or metadata with dir mode |
229 /// | stat file with `/` | `abc/def_file/` | Error `NotFound` |
230 /// | stat not existing path | `xyz` | Error `NotFound` |
231 ///
232 /// Refer to [RFC: List Prefix][crate::docs::rfcs::rfc_3243_list_prefix] for more details.
233 ///
234 /// ## Services that not support `create_dir`
235 ///
236 /// For services that not support `create_dir`, `stat("test/")` will return `NotFound` even
237 /// when `test/abc` exists since the service won't have the concept of dir. There is nothing
238 /// we can do about this.
239 pub fn stat_options(&self, path: &str, opts: options::StatOptions) -> Result<Metadata> {
240 self.handle.block_on(self.op.stat_options(path, opts))
241 }
242
243 /// Check if this path exists or not.
244 ///
245 /// # Example
246 ///
247 /// ```no_run
248 /// use anyhow::Result;
249 /// use opendal::blocking;
250 /// use opendal::blocking::Operator;
251 /// fn test(op: blocking::Operator) -> Result<()> {
252 /// let _ = op.exists("test")?;
253 ///
254 /// Ok(())
255 /// }
256 /// ```
257 pub fn exists(&self, path: &str) -> Result<bool> {
258 let r = self.stat(path);
259 match r {
260 Ok(_) => Ok(true),
261 Err(err) => match err.kind() {
262 ErrorKind::NotFound => Ok(false),
263 _ => Err(err),
264 },
265 }
266 }
267
268 /// Create a dir at given path.
269 ///
270 /// # Notes
271 ///
272 /// To indicate that a path is a directory, it is compulsory to include
273 /// a trailing / in the path. Failure to do so may result in
274 /// `NotADirectory` error being returned by OpenDAL.
275 ///
276 /// # Behavior
277 ///
278 /// - Create on existing dir will succeed.
279 /// - Create dir is always recursive, works like `mkdir -p`
280 ///
281 /// # Examples
282 ///
283 /// ```no_run
284 /// # use opendal::Result;
285 /// use opendal::blocking;
286 /// # use opendal::blocking::Operator;
287 /// # use futures::TryStreamExt;
288 /// # fn test(op: blocking::Operator) -> Result<()> {
289 /// op.create_dir("path/to/dir/")?;
290 /// # Ok(())
291 /// # }
292 /// ```
293 pub fn create_dir(&self, path: &str) -> Result<()> {
294 self.handle.block_on(self.op.create_dir(path))
295 }
296
297 /// Read the whole path into a bytes.
298 ///
299 /// This function will allocate a new bytes internally. For more precise memory control or
300 /// reading data lazily, please use [`blocking::Operator::reader`]
301 ///
302 /// # Examples
303 ///
304 /// ```no_run
305 /// # use opendal::Result;
306 /// use opendal::blocking;
307 /// # use opendal::blocking::Operator;
308 /// #
309 /// # fn test(op: blocking::Operator) -> Result<()> {
310 /// let bs = op.read("path/to/file")?;
311 /// # Ok(())
312 /// # }
313 /// ```
314 pub fn read(&self, path: &str) -> Result<Buffer> {
315 self.read_options(path, options::ReadOptions::default())
316 }
317
318 /// Read the whole path into a bytes with extra options.
319 ///
320 /// This function will allocate a new bytes internally. For more precise memory control or
321 /// reading data lazily, please use [`blocking::Operator::reader`]
322 pub fn read_options(&self, path: &str, opts: options::ReadOptions) -> Result<Buffer> {
323 self.handle.block_on(self.op.read_options(path, opts))
324 }
325
326 /// Create a new reader which can read the whole path.
327 ///
328 /// # Examples
329 ///
330 /// ```no_run
331 /// # use opendal::Result;
332 /// use opendal::blocking;
333 /// # use opendal::blocking::Operator;
334 /// # use futures::TryStreamExt;
335 /// # fn test(op: blocking::Operator) -> Result<()> {
336 /// let r = op.reader("path/to/file")?;
337 /// # Ok(())
338 /// # }
339 /// ```
340 pub fn reader(&self, path: &str) -> Result<blocking::Reader> {
341 self.reader_options(path, options::ReaderOptions::default())
342 }
343
344 /// Create a new reader with extra options
345 pub fn reader_options(
346 &self,
347 path: &str,
348 opts: options::ReaderOptions,
349 ) -> Result<blocking::Reader> {
350 let r = self.handle.block_on(self.op.reader_options(path, opts))?;
351 Ok(blocking::Reader::new(self.handle.clone(), r))
352 }
353
354 /// Write bytes into given path.
355 ///
356 /// # Notes
357 ///
358 /// - Write will make sure all bytes has been written, or an error will be returned.
359 ///
360 /// # Examples
361 ///
362 /// ```no_run
363 /// # use opendal::Result;
364 /// use opendal::blocking;
365 /// # use opendal::blocking::Operator;
366 /// # use futures::StreamExt;
367 /// # use futures::SinkExt;
368 /// use bytes::Bytes;
369 ///
370 /// # fn test(op: blocking::Operator) -> Result<()> {
371 /// op.write("path/to/file", vec![0; 4096])?;
372 /// # Ok(())
373 /// # }
374 /// ```
375 pub fn write(&self, path: &str, bs: impl Into<Buffer>) -> Result<Metadata> {
376 self.write_options(path, bs, options::WriteOptions::default())
377 }
378
379 /// Write data with options.
380 ///
381 /// # Notes
382 ///
383 /// - Write will make sure all bytes has been written, or an error will be returned.
384 pub fn write_options(
385 &self,
386 path: &str,
387 bs: impl Into<Buffer>,
388 opts: options::WriteOptions,
389 ) -> Result<Metadata> {
390 self.handle.block_on(self.op.write_options(path, bs, opts))
391 }
392
393 /// Write multiple bytes into given path.
394 ///
395 /// # Notes
396 ///
397 /// - Write will make sure all bytes has been written, or an error will be returned.
398 ///
399 /// # Examples
400 ///
401 /// ```no_run
402 /// # use opendal::Result;
403 /// # use opendal::blocking;
404 /// # use opendal::blocking::Operator;
405 /// # use futures::StreamExt;
406 /// # use futures::SinkExt;
407 /// use bytes::Bytes;
408 ///
409 /// # fn test(op: blocking::Operator) -> Result<()> {
410 /// let mut w = op.writer("path/to/file")?;
411 /// w.write(vec![0; 4096])?;
412 /// w.write(vec![1; 4096])?;
413 /// w.close()?;
414 /// # Ok(())
415 /// # }
416 /// ```
417 pub fn writer(&self, path: &str) -> Result<blocking::Writer> {
418 self.writer_options(path, options::WriteOptions::default())
419 }
420
421 /// Create a new writer with extra options
422 pub fn writer_options(
423 &self,
424 path: &str,
425 opts: options::WriteOptions,
426 ) -> Result<blocking::Writer> {
427 let w = self.handle.block_on(self.op.writer_options(path, opts))?;
428 Ok(blocking::Writer::new(self.handle.clone(), w))
429 }
430
431 /// Copy a file from `from` to `to`.
432 ///
433 /// # Notes
434 ///
435 /// - `from` and `to` must be a file.
436 /// - `to` will be overwritten if it exists.
437 /// - If `from` and `to` are the same, nothing will happen.
438 /// - `copy` is idempotent. For same `from` and `to` input, the result will be the same.
439 ///
440 /// # Examples
441 ///
442 /// ```
443 /// # use opendal::Result;
444 /// use opendal::blocking;
445 /// # use opendal::blocking::Operator;
446 ///
447 /// # fn test(op: blocking::Operator) -> Result<()> {
448 /// op.copy("path/to/file", "path/to/file2")?;
449 /// # Ok(())
450 /// # }
451 /// ```
452 pub fn copy(&self, from: &str, to: &str) -> Result<()> {
453 self.handle.block_on(self.op.copy(from, to))
454 }
455
456 /// Rename a file from `from` to `to`.
457 ///
458 /// # Notes
459 ///
460 /// - `from` and `to` must be a file.
461 /// - `to` will be overwritten if it exists.
462 /// - If `from` and `to` are the same, a `IsSameFile` error will occur.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// # use opendal::Result;
468 /// use opendal::blocking;
469 /// # use opendal::blocking::Operator;
470 ///
471 /// # fn test(op: blocking::Operator) -> Result<()> {
472 /// op.rename("path/to/file", "path/to/file2")?;
473 /// # Ok(())
474 /// # }
475 /// ```
476 pub fn rename(&self, from: &str, to: &str) -> Result<()> {
477 self.handle.block_on(self.op.rename(from, to))
478 }
479
480 /// Delete given path.
481 ///
482 /// # Notes
483 ///
484 /// - Delete not existing error won't return errors.
485 ///
486 /// # Examples
487 ///
488 /// ```no_run
489 /// # use anyhow::Result;
490 /// # use futures::io;
491 /// use opendal::blocking;
492 /// # use opendal::blocking::Operator;
493 /// # fn test(op: blocking::Operator) -> Result<()> {
494 /// op.delete("path/to/file")?;
495 /// # Ok(())
496 /// # }
497 /// ```
498 pub fn delete(&self, path: &str) -> Result<()> {
499 self.delete_options(path, options::DeleteOptions::default())
500 }
501
502 /// Delete given path with options.
503 ///
504 /// # Notes
505 ///
506 /// - Delete not existing error won't return errors.
507 pub fn delete_options(&self, path: &str, opts: options::DeleteOptions) -> Result<()> {
508 self.handle.block_on(self.op.delete_options(path, opts))
509 }
510
511 /// Delete an infallible iterator of paths.
512 ///
513 /// Also see:
514 ///
515 /// - [`blocking::Operator::delete_try_iter`]: delete an fallible iterator of paths.
516 pub fn delete_iter<I, D>(&self, iter: I) -> Result<()>
517 where
518 I: IntoIterator<Item = D>,
519 D: IntoDeleteInput,
520 {
521 self.handle.block_on(self.op.delete_iter(iter))
522 }
523
524 /// Delete a fallible iterator of paths.
525 ///
526 /// Also see:
527 ///
528 /// - [`blocking::Operator::delete_iter`]: delete an infallible iterator of paths.
529 pub fn delete_try_iter<I, D>(&self, try_iter: I) -> Result<()>
530 where
531 I: IntoIterator<Item = Result<D>>,
532 D: IntoDeleteInput,
533 {
534 self.handle.block_on(self.op.delete_try_iter(try_iter))
535 }
536
537 /// Create a [`BlockingDeleter`] to continuously remove content from storage.
538 ///
539 /// It leverages batch deletion capabilities provided by storage services for efficient removal.
540 ///
541 /// Users can have more control over the deletion process by using [`BlockingDeleter`] directly.
542 pub fn deleter(&self) -> Result<blocking::Deleter> {
543 blocking::Deleter::create(
544 self.handle.clone(),
545 self.handle.block_on(self.op.deleter())?,
546 )
547 }
548
549 /// Remove the path and all nested dirs and files recursively.
550 ///
551 /// # Notes
552 ///
553 /// We don't support batch delete now.
554 ///
555 /// # Examples
556 ///
557 /// ```
558 /// # use anyhow::Result;
559 /// # use futures::io;
560 /// use opendal::blocking;
561 /// # use opendal::blocking::Operator;
562 /// # fn test(op: blocking::Operator) -> Result<()> {
563 /// op.remove_all("path/to/dir")?;
564 /// # Ok(())
565 /// # }
566 /// ```
567 pub fn remove_all(&self, path: &str) -> Result<()> {
568 self.handle.block_on(self.op.remove_all(path))
569 }
570
571 /// List entries that starts with given `path` in parent dir.
572 ///
573 /// # Notes
574 ///
575 /// ## Recursively List
576 ///
577 /// This function only read the children of the given directory. To read
578 /// all entries recursively, use `blocking::Operator::list_options("path", opts)`
579 /// instead.
580 ///
581 /// ## Streaming List
582 ///
583 /// This function will read all entries in the given directory. It could
584 /// take very long time and consume a lot of memory if the directory
585 /// contains a lot of entries.
586 ///
587 /// In order to avoid this, you can use [`blocking::Operator::lister`] to list entries in
588 /// a streaming way.
589 ///
590 /// # Examples
591 ///
592 /// ```no_run
593 /// # use anyhow::Result;
594 /// use opendal::blocking;
595 /// use opendal::blocking::Operator;
596 /// use opendal::EntryMode;
597 /// # fn test(op: blocking::Operator) -> Result<()> {
598 /// let mut entries = op.list("path/to/dir/")?;
599 /// for entry in entries {
600 /// match entry.metadata().mode() {
601 /// EntryMode::FILE => {
602 /// println!("Handling file")
603 /// }
604 /// EntryMode::DIR => {
605 /// println!("Handling dir {}", entry.path())
606 /// }
607 /// EntryMode::Unknown => continue,
608 /// }
609 /// }
610 /// # Ok(())
611 /// # }
612 /// ```
613 pub fn list(&self, path: &str) -> Result<Vec<Entry>> {
614 self.list_options(path, options::ListOptions::default())
615 }
616
617 /// List entries that starts with given `path` in parent dir. with options.
618 ///
619 /// # Notes
620 ///
621 /// ## Streaming List
622 ///
623 /// This function will read all entries in the given directory. It could
624 /// take very long time and consume a lot of memory if the directory
625 /// contains a lot of entries.
626 ///
627 /// In order to avoid this, you can use [`blocking::Operator::lister`] to list entries in
628 /// a streaming way.
629 pub fn list_options(&self, path: &str, opts: options::ListOptions) -> Result<Vec<Entry>> {
630 self.handle.block_on(self.op.list_options(path, opts))
631 }
632
633 /// List entries that starts with given `path` in parent dir.
634 ///
635 /// This function will create a new [`BlockingLister`] to list entries. Users can stop listing
636 /// via dropping this [`Lister`].
637 ///
638 /// # Notes
639 ///
640 /// ## Recursively List
641 ///
642 /// This function only read the children of the given directory. To read
643 /// all entries recursively, use [`blocking::Operator::lister_with`] and `delimiter("")`
644 /// instead.
645 ///
646 /// # Examples
647 ///
648 /// ```no_run
649 /// # use anyhow::Result;
650 /// # use futures::io;
651 /// use futures::TryStreamExt;
652 /// use opendal::blocking;
653 /// use opendal::blocking::Operator;
654 /// use opendal::EntryMode;
655 /// # fn test(op: blocking::Operator) -> Result<()> {
656 /// let mut ds = op.lister("path/to/dir/")?;
657 /// for de in ds {
658 /// let de = de?;
659 /// match de.metadata().mode() {
660 /// EntryMode::FILE => {
661 /// println!("Handling file")
662 /// }
663 /// EntryMode::DIR => {
664 /// println!("Handling dir like start a new list via meta.path()")
665 /// }
666 /// EntryMode::Unknown => continue,
667 /// }
668 /// }
669 /// # Ok(())
670 /// # }
671 /// ```
672 pub fn lister(&self, path: &str) -> Result<blocking::Lister> {
673 self.lister_options(path, options::ListOptions::default())
674 }
675
676 /// List entries within a given directory as an iterator with options.
677 ///
678 /// This function will create a new handle to list entries.
679 ///
680 /// An error will be returned if given path doesn't end with `/`.
681 pub fn lister_options(
682 &self,
683 path: &str,
684 opts: options::ListOptions,
685 ) -> Result<blocking::Lister> {
686 let l = self.handle.block_on(self.op.lister_options(path, opts))?;
687 Ok(blocking::Lister::new(self.handle.clone(), l))
688 }
689
690 /// Check if this operator can work correctly.
691 ///
692 /// We will send a `list` request to path and return any errors we met.
693 ///
694 /// ```
695 /// # use std::sync::Arc;
696 /// # use anyhow::Result;
697 /// use opendal::blocking;
698 /// use opendal::blocking::Operator;
699 /// use opendal::ErrorKind;
700 ///
701 /// # fn test(op: blocking::Operator) -> Result<()> {
702 /// op.check()?;
703 /// # Ok(())
704 /// # }
705 /// ```
706 pub fn check(&self) -> Result<()> {
707 let mut ds = self.lister("/")?;
708
709 match ds.next() {
710 Some(Err(e)) if e.kind() != ErrorKind::NotFound => Err(e),
711 _ => Ok(()),
712 }
713 }
714}
715
716impl From<Operator> for AsyncOperator {
717 fn from(val: Operator) -> Self {
718 val.op
719 }
720}