opendal/types/operator/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::layers::*;
21use crate::raw::*;
22use crate::types::IntoOperatorUri;
23use crate::*;
24
25/// # Operator build API
26///
27/// Operator should be built via [`OperatorBuilder`]. We recommend to use [`Operator::new`] to get started:
28///
29/// ```
30/// # use anyhow::Result;
31/// use opendal::services::Fs;
32/// use opendal::Operator;
33/// async fn test() -> Result<()> {
34///     // Create fs backend builder.
35///     let builder = Fs::default().root("/tmp");
36///
37///     // Build an `Operator` to start operating the storage.
38///     let op: Operator = Operator::new(builder)?.finish();
39///
40///     Ok(())
41/// }
42/// ```
43impl Operator {
44    /// Create a new operator with input builder.
45    ///
46    /// OpenDAL will call `builder.build()` internally, so we don't need
47    /// to import `opendal::Builder` trait.
48    ///
49    /// # Examples
50    ///
51    /// Read more backend init examples in [examples](https://github.com/apache/opendal/tree/main/examples).
52    ///
53    /// ```
54    /// # use anyhow::Result;
55    /// use opendal::services::Fs;
56    /// use opendal::Operator;
57    /// async fn test() -> Result<()> {
58    ///     // Create fs backend builder.
59    ///     let builder = Fs::default().root("/tmp");
60    ///
61    ///     // Build an `Operator` to start operating the storage.
62    ///     let op: Operator = Operator::new(builder)?.finish();
63    ///
64    ///     Ok(())
65    /// }
66    /// ```
67    #[allow(clippy::new_ret_no_self)]
68    pub fn new<B: Builder>(ab: B) -> Result<OperatorBuilder<impl Access>> {
69        let acc = ab.build()?;
70        Ok(OperatorBuilder::new(acc))
71    }
72
73    /// Create a new operator from given config.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// # use anyhow::Result;
79    /// use std::collections::HashMap;
80    ///
81    /// use opendal::services::MemoryConfig;
82    /// use opendal::Operator;
83    /// async fn test() -> Result<()> {
84    ///     let cfg = MemoryConfig::default();
85    ///
86    ///     // Build an `Operator` to start operating the storage.
87    ///     let op: Operator = Operator::from_config(cfg)?.finish();
88    ///
89    ///     Ok(())
90    /// }
91    /// ```
92    pub fn from_config<C: Configurator>(cfg: C) -> Result<OperatorBuilder<impl Access>> {
93        let builder = cfg.into_builder();
94        let acc = builder.build()?;
95        Ok(OperatorBuilder::new(acc))
96    }
97
98    /// Create a new operator from given iterator in static dispatch.
99    ///
100    /// # Notes
101    ///
102    /// `from_iter` generates a `OperatorBuilder` which allows adding layer in zero-cost way.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// # use anyhow::Result;
108    /// use std::collections::HashMap;
109    ///
110    /// use opendal::services::Fs;
111    /// use opendal::Operator;
112    /// async fn test() -> Result<()> {
113    ///     let map = HashMap::from([
114    ///         // Set the root for fs, all operations will happen under this root.
115    ///         //
116    ///         // NOTE: the root must be absolute path.
117    ///         ("root".to_string(), "/tmp".to_string()),
118    ///     ]);
119    ///
120    ///     // Build an `Operator` to start operating the storage.
121    ///     let op: Operator = Operator::from_iter::<Fs>(map)?.finish();
122    ///
123    ///     Ok(())
124    /// }
125    /// ```
126    #[allow(clippy::should_implement_trait)]
127    pub fn from_iter<B: Builder>(
128        iter: impl IntoIterator<Item = (String, String)>,
129    ) -> Result<OperatorBuilder<impl Access>> {
130        let builder = B::Config::from_iter(iter)?.into_builder();
131        let acc = builder.build()?;
132        Ok(OperatorBuilder::new(acc))
133    }
134
135    /// Create a new operator by parsing configuration from a URI.
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// # use anyhow::Result;
141    /// use opendal::Operator;
142    ///
143    /// # fn example() -> Result<()> {
144    /// let op = Operator::from_uri("memory://localhost/")?;
145    /// # let _ = op;
146    /// # Ok(())
147    /// # }
148    /// ```
149    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Operator> {
150        crate::DEFAULT_OPERATOR_REGISTRY.load(uri)
151    }
152
153    /// Create a new operator via given scheme and iterator of config value in dynamic dispatch.
154    ///
155    /// # Notes
156    ///
157    /// `via_iter` generates a `Operator` which allows building operator without generic type.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// # use anyhow::Result;
163    /// use std::collections::HashMap;
164    ///
165    /// use opendal::Operator;
166    /// use opendal::services;
167    ///
168    /// async fn test() -> Result<()> {
169    ///     let map = [
170    ///         // Set the root for fs, all operations will happen under this root.
171    ///         //
172    ///         // NOTE: the root must be absolute path.
173    ///         ("root".to_string(), "/tmp".to_string()),
174    ///     ];
175    ///
176    ///     // Build an `Operator` to start operating the storage.
177    ///     let op: Operator = Operator::via_iter(services::FS_SCHEME, map)?;
178    ///
179    ///     Ok(())
180    /// }
181    /// ```
182    #[allow(unused_variables, unreachable_code)]
183    pub fn via_iter(
184        scheme: impl AsRef<str>,
185        iter: impl IntoIterator<Item = (String, String)>,
186    ) -> Result<Operator> {
187        let op = match scheme.as_ref() {
188            #[cfg(feature = "services-aliyun-drive")]
189            services::ALIYUN_DRIVE_SCHEME => {
190                Self::from_iter::<services::AliyunDrive>(iter)?.finish()
191            }
192            #[cfg(feature = "services-alluxio")]
193            services::ALLUXIO_SCHEME => Self::from_iter::<services::Alluxio>(iter)?.finish(),
194            #[cfg(feature = "services-azblob")]
195            services::AZBLOB_SCHEME => Self::from_iter::<services::Azblob>(iter)?.finish(),
196            #[cfg(feature = "services-azdls")]
197            services::AZDLS_SCHEME => Self::from_iter::<services::Azdls>(iter)?.finish(),
198            #[cfg(feature = "services-azfile")]
199            services::AZFILE_SCHEME => Self::from_iter::<services::Azfile>(iter)?.finish(),
200            #[cfg(feature = "services-b2")]
201            services::B2_SCHEME => Self::from_iter::<services::B2>(iter)?.finish(),
202            #[cfg(feature = "services-cacache")]
203            services::CACACHE_SCHEME => Self::from_iter::<services::Cacache>(iter)?.finish(),
204            #[cfg(feature = "services-cloudflare-kv")]
205            services::CLOUDFLARE_KV_SCHEME => {
206                Self::from_iter::<services::CloudflareKv>(iter)?.finish()
207            }
208            #[cfg(feature = "services-compfs")]
209            services::COMPFS_SCHEME => Self::from_iter::<services::Compfs>(iter)?.finish(),
210            #[cfg(feature = "services-cos")]
211            services::COS_SCHEME => Self::from_iter::<services::Cos>(iter)?.finish(),
212            #[cfg(feature = "services-d1")]
213            services::D1_SCHEME => Self::from_iter::<services::D1>(iter)?.finish(),
214            #[cfg(feature = "services-dashmap")]
215            services::DASHMAP_SCHEME => Self::from_iter::<services::Dashmap>(iter)?.finish(),
216            #[cfg(feature = "services-dbfs")]
217            services::DBFS_SCHEME => Self::from_iter::<services::Dbfs>(iter)?.finish(),
218            #[cfg(feature = "services-dropbox")]
219            services::DROPBOX_SCHEME => Self::from_iter::<services::Dropbox>(iter)?.finish(),
220            #[cfg(feature = "services-etcd")]
221            services::ETCD_SCHEME => Self::from_iter::<services::Etcd>(iter)?.finish(),
222            #[cfg(feature = "services-foundationdb")]
223            services::FOUNDATIONDB_SCHEME => {
224                Self::from_iter::<services::Foundationdb>(iter)?.finish()
225            }
226            #[cfg(feature = "services-fs")]
227            services::FS_SCHEME => Self::from_iter::<services::Fs>(iter)?.finish(),
228            #[cfg(feature = "services-ftp")]
229            services::FTP_SCHEME => Self::from_iter::<services::Ftp>(iter)?.finish(),
230            #[cfg(feature = "services-gcs")]
231            services::GCS_SCHEME => Self::from_iter::<services::Gcs>(iter)?.finish(),
232            #[cfg(feature = "services-gdrive")]
233            services::GDRIVE_SCHEME => Self::from_iter::<services::Gdrive>(iter)?.finish(),
234            #[cfg(feature = "services-ghac")]
235            services::GHAC_SCHEME => Self::from_iter::<services::Ghac>(iter)?.finish(),
236            #[cfg(feature = "services-github")]
237            services::GITHUB_SCHEME => Self::from_iter::<services::Github>(iter)?.finish(),
238            #[cfg(feature = "services-gridfs")]
239            services::GRIDFS_SCHEME => Self::from_iter::<services::Gridfs>(iter)?.finish(),
240            #[cfg(feature = "services-hdfs")]
241            services::HDFS_SCHEME => Self::from_iter::<services::Hdfs>(iter)?.finish(),
242            #[cfg(feature = "services-hdfs-native")]
243            services::HDFS_NATIVE_SCHEME => Self::from_iter::<services::HdfsNative>(iter)?.finish(),
244            #[cfg(feature = "services-http")]
245            services::HTTP_SCHEME => Self::from_iter::<services::Http>(iter)?.finish(),
246            #[cfg(feature = "services-huggingface")]
247            services::HUGGINGFACE_SCHEME => {
248                Self::from_iter::<services::Huggingface>(iter)?.finish()
249            }
250            #[cfg(feature = "services-ipfs")]
251            services::IPFS_SCHEME => Self::from_iter::<services::Ipfs>(iter)?.finish(),
252            #[cfg(feature = "services-ipmfs")]
253            services::IPMFS_SCHEME => Self::from_iter::<services::Ipmfs>(iter)?.finish(),
254            #[cfg(feature = "services-koofr")]
255            services::KOOFR_SCHEME => Self::from_iter::<services::Koofr>(iter)?.finish(),
256            #[cfg(feature = "services-lakefs")]
257            services::LAKEFS_SCHEME => Self::from_iter::<services::Lakefs>(iter)?.finish(),
258            #[cfg(feature = "services-memcached")]
259            services::MEMCACHED_SCHEME => Self::from_iter::<services::Memcached>(iter)?.finish(),
260            #[cfg(feature = "services-memory")]
261            services::MEMORY_SCHEME => Self::from_iter::<services::Memory>(iter)?.finish(),
262            #[cfg(feature = "services-mini-moka")]
263            services::MINI_MOKA_SCHEME => Self::from_iter::<services::MiniMoka>(iter)?.finish(),
264            #[cfg(feature = "services-moka")]
265            services::MOKA_SCHEME => Self::from_iter::<services::Moka>(iter)?.finish(),
266            #[cfg(feature = "services-mongodb")]
267            services::MONGODB_SCHEME => Self::from_iter::<services::Mongodb>(iter)?.finish(),
268            #[cfg(feature = "services-monoiofs")]
269            services::MONOIOFS_SCHEME => Self::from_iter::<services::Monoiofs>(iter)?.finish(),
270            #[cfg(feature = "services-mysql")]
271            services::MYSQL_SCHEME => Self::from_iter::<services::Mysql>(iter)?.finish(),
272            #[cfg(feature = "services-obs")]
273            services::OBS_SCHEME => Self::from_iter::<services::Obs>(iter)?.finish(),
274            #[cfg(feature = "services-onedrive")]
275            services::ONEDRIVE_SCHEME => Self::from_iter::<services::Onedrive>(iter)?.finish(),
276            #[cfg(feature = "services-oss")]
277            services::OSS_SCHEME => Self::from_iter::<services::Oss>(iter)?.finish(),
278            #[cfg(feature = "services-pcloud")]
279            services::PCLOUD_SCHEME => Self::from_iter::<services::Pcloud>(iter)?.finish(),
280            #[cfg(feature = "services-persy")]
281            services::PERSY_SCHEME => Self::from_iter::<services::Persy>(iter)?.finish(),
282            #[cfg(feature = "services-postgresql")]
283            services::POSTGRESQL_SCHEME => Self::from_iter::<services::Postgresql>(iter)?.finish(),
284            #[cfg(feature = "services-redb")]
285            services::REDB_SCHEME => Self::from_iter::<services::Redb>(iter)?.finish(),
286            #[cfg(feature = "services-redis")]
287            services::REDIS_SCHEME => Self::from_iter::<services::Redis>(iter)?.finish(),
288            #[cfg(feature = "services-rocksdb")]
289            services::ROCKSDB_SCHEME => Self::from_iter::<services::Rocksdb>(iter)?.finish(),
290            #[cfg(feature = "services-s3")]
291            services::S3_SCHEME => Self::from_iter::<services::S3>(iter)?.finish(),
292            #[cfg(feature = "services-seafile")]
293            services::SEAFILE_SCHEME => Self::from_iter::<services::Seafile>(iter)?.finish(),
294            #[cfg(feature = "services-sftp")]
295            services::SFTP_SCHEME => Self::from_iter::<services::Sftp>(iter)?.finish(),
296            #[cfg(feature = "services-sled")]
297            services::SLED_SCHEME => Self::from_iter::<services::Sled>(iter)?.finish(),
298            #[cfg(feature = "services-sqlite")]
299            services::SQLITE_SCHEME => Self::from_iter::<services::Sqlite>(iter)?.finish(),
300            #[cfg(feature = "services-surrealdb")]
301            services::SURREALDB_SCHEME => Self::from_iter::<services::Surrealdb>(iter)?.finish(),
302            #[cfg(feature = "services-swift")]
303            services::SWIFT_SCHEME => Self::from_iter::<services::Swift>(iter)?.finish(),
304            #[cfg(feature = "services-tikv")]
305            services::TIKV_SCHEME => Self::from_iter::<services::Tikv>(iter)?.finish(),
306            #[cfg(feature = "services-upyun")]
307            services::UPYUN_SCHEME => Self::from_iter::<services::Upyun>(iter)?.finish(),
308            #[cfg(feature = "services-vercel-artifacts")]
309            services::VERCEL_ARTIFACTS_SCHEME => {
310                Self::from_iter::<services::VercelArtifacts>(iter)?.finish()
311            }
312            #[cfg(feature = "services-vercel-blob")]
313            services::VERCEL_BLOB_SCHEME => Self::from_iter::<services::VercelBlob>(iter)?.finish(),
314            #[cfg(feature = "services-webdav")]
315            services::WEBDAV_SCHEME => Self::from_iter::<services::Webdav>(iter)?.finish(),
316            #[cfg(feature = "services-webhdfs")]
317            services::WEBHDFS_SCHEME => Self::from_iter::<services::Webhdfs>(iter)?.finish(),
318            #[cfg(feature = "services-yandex-disk")]
319            services::YANDEX_DISK_SCHEME => Self::from_iter::<services::YandexDisk>(iter)?.finish(),
320            v => {
321                return Err(Error::new(
322                    ErrorKind::Unsupported,
323                    "scheme is not enabled or supported",
324                )
325                .with_context("scheme", v));
326            }
327        };
328
329        Ok(op)
330    }
331
332    /// Create a new layer with dynamic dispatch.
333    ///
334    /// Please note that `Layer` can modify internal contexts such as `HttpClient`
335    /// and `Runtime` for the operator. Therefore, it is recommended to add layers
336    /// before interacting with the storage. Adding or duplicating layers after
337    /// accessing the storage may result in unexpected behavior.
338    ///
339    /// # Notes
340    ///
341    /// `OperatorBuilder::layer()` is using static dispatch which is zero
342    /// cost. `Operator::layer()` is using dynamic dispatch which has a
343    /// bit runtime overhead with an extra vtable lookup and unable to
344    /// inline.
345    ///
346    /// It's always recommended to use `OperatorBuilder::layer()` instead.
347    ///
348    /// # Examples
349    ///
350    /// ```no_run
351    /// # use std::sync::Arc;
352    /// # use anyhow::Result;
353    /// use opendal::layers::LoggingLayer;
354    /// use opendal::services::Memory;
355    /// use opendal::Operator;
356    ///
357    /// # async fn test() -> Result<()> {
358    /// let op = Operator::new(Memory::default())?.finish();
359    /// let op = op.layer(LoggingLayer::default());
360    /// // All operations will go through the new_layer
361    /// let _ = op.read("test_file").await?;
362    /// # Ok(())
363    /// # }
364    /// ```
365    #[must_use]
366    pub fn layer<L: Layer<Accessor>>(self, layer: L) -> Self {
367        Self::from_inner(Arc::new(
368            TypeEraseLayer.layer(layer.layer(self.into_inner())),
369        ))
370    }
371}
372
373/// OperatorBuilder is a typed builder to build an Operator.
374///
375/// # Notes
376///
377/// OpenDAL uses static dispatch internally and only performs dynamic
378/// dispatch at the outmost type erase layer. OperatorBuilder is the only
379/// public API provided by OpenDAL come with generic parameters.
380///
381/// It's required to call `finish` after the operator built.
382///
383/// # Examples
384///
385/// For users who want to support many services, we can build a helper function like the following:
386///
387/// ```
388/// use std::collections::HashMap;
389///
390/// use opendal::layers::LoggingLayer;
391/// use opendal::layers::RetryLayer;
392/// use opendal::services;
393/// use opendal::Builder;
394/// use opendal::Operator;
395/// use opendal::Result;
396///
397/// fn init_service<B: Builder>(cfg: HashMap<String, String>) -> Result<Operator> {
398///     let op = Operator::from_iter::<B>(cfg)?
399///         .layer(LoggingLayer::default())
400///         .layer(RetryLayer::new())
401///         .finish();
402///
403///     Ok(op)
404/// }
405///
406/// async fn init(scheme: &str, cfg: HashMap<String, String>) -> Result<()> {
407///     let _ = match scheme {
408///         services::MEMORY_SCHEME => init_service::<services::Memory>(cfg)?,
409///         _ => todo!(),
410///     };
411///
412///     Ok(())
413/// }
414/// ```
415pub struct OperatorBuilder<A: Access> {
416    accessor: A,
417}
418
419impl<A: Access> OperatorBuilder<A> {
420    /// Create a new operator builder.
421    #[allow(clippy::new_ret_no_self)]
422    pub fn new(accessor: A) -> OperatorBuilder<impl Access> {
423        // Make sure error context layer has been attached.
424        OperatorBuilder { accessor }
425            .layer(ErrorContextLayer)
426            .layer(CompleteLayer)
427            .layer(CorrectnessCheckLayer)
428    }
429
430    /// Create a new layer with static dispatch.
431    ///
432    /// # Notes
433    ///
434    /// `OperatorBuilder::layer()` is using static dispatch which is zero
435    /// cost. `Operator::layer()` is using dynamic dispatch which has a
436    /// bit runtime overhead with an extra vtable lookup and unable to
437    /// inline.
438    ///
439    /// It's always recommended to use `OperatorBuilder::layer()` instead.
440    ///
441    /// # Examples
442    ///
443    /// ```no_run
444    /// # use std::sync::Arc;
445    /// # use anyhow::Result;
446    /// use opendal::layers::LoggingLayer;
447    /// use opendal::services::Memory;
448    /// use opendal::Operator;
449    ///
450    /// # async fn test() -> Result<()> {
451    /// let op = Operator::new(Memory::default())?
452    ///     .layer(LoggingLayer::default())
453    ///     .finish();
454    /// // All operations will go through the new_layer
455    /// let _ = op.read("test_file").await?;
456    /// # Ok(())
457    /// # }
458    /// ```
459    #[must_use]
460    pub fn layer<L: Layer<A>>(self, layer: L) -> OperatorBuilder<L::LayeredAccess> {
461        OperatorBuilder {
462            accessor: layer.layer(self.accessor),
463        }
464    }
465
466    /// Finish the building to construct an Operator.
467    pub fn finish(self) -> Operator {
468        let ob = self.layer(TypeEraseLayer);
469        Operator::from_inner(Arc::new(ob.accessor) as Accessor)
470    }
471}