opendal/types/operator/
registry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::{LazyLock, Mutex};
20
21use http::Uri;
22use percent_encoding::percent_decode_str;
23
24use crate::services;
25use crate::types::builder::{Builder, Configurator};
26use crate::{Error, ErrorKind, Operator, Result};
27
28/// Factory signature used to construct [`Operator`] from a URI and extra options.
29pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
30
31/// Default registry initialized with builtin services.
32pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = LazyLock::new(|| {
33    let registry = OperatorRegistry::new();
34    register_builtin_services(&registry);
35    registry
36});
37
38/// Global registry that maps schemes to [`OperatorFactory`] functions.
39#[derive(Debug, Default)]
40pub struct OperatorRegistry {
41    factories: Mutex<HashMap<String, OperatorFactory>>,
42}
43
44impl OperatorRegistry {
45    /// Create a new, empty registry.
46    pub fn new() -> Self {
47        Self {
48            factories: Mutex::new(HashMap::new()),
49        }
50    }
51
52    /// Register a builder for the given scheme.
53    pub fn register<B: Builder>(&self, scheme: &str) {
54        let key = scheme.to_ascii_lowercase();
55        let mut guard = self
56            .factories
57            .lock()
58            .expect("operator registry mutex poisoned");
59        guard.insert(key, factory::<B::Config>);
60    }
61
62    /// Load an [`Operator`] via the factory registered for the URI's scheme.
63    pub fn load(
64        &self,
65        uri: &str,
66        options: impl IntoIterator<Item = (String, String)>,
67    ) -> Result<Operator> {
68        let parsed = uri.parse::<Uri>().map_err(|err| {
69            Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err)
70        })?;
71
72        let scheme = parsed
73            .scheme_str()
74            .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is required"))?;
75
76        let key = scheme.to_ascii_lowercase();
77        let factory = self
78            .factories
79            .lock()
80            .expect("operator registry mutex poisoned")
81            .get(key.as_str())
82            .copied()
83            .ok_or_else(|| {
84                Error::new(ErrorKind::Unsupported, "scheme is not registered")
85                    .with_context("scheme", scheme)
86            })?;
87
88        let opts: Vec<(String, String)> = options.into_iter().collect();
89        factory(uri, opts)
90    }
91}
92
93fn register_builtin_services(registry: &OperatorRegistry) {
94    #[cfg(feature = "services-memory")]
95    registry.register::<services::Memory>(services::MEMORY_SCHEME);
96    #[cfg(feature = "services-fs")]
97    registry.register::<services::Fs>(services::FS_SCHEME);
98    #[cfg(feature = "services-s3")]
99    registry.register::<services::S3>(services::S3_SCHEME);
100}
101
102/// Factory adapter that builds an operator from a configurator type.
103fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) -> Result<Operator> {
104    let parsed = uri.parse::<Uri>().map_err(|err| {
105        Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err)
106    })?;
107
108    let mut params = HashMap::new();
109    if let Some(query) = parsed.query() {
110        for pair in query.split('&') {
111            if pair.is_empty() {
112                continue;
113            }
114            let mut parts = pair.splitn(2, '=');
115            let key = parts.next().unwrap_or("");
116            let value = parts.next().unwrap_or("");
117            let key = percent_decode_str(key).decode_utf8_lossy().to_string();
118            let value = percent_decode_str(value).decode_utf8_lossy().to_string();
119            params.insert(key.to_ascii_lowercase(), value);
120        }
121    }
122
123    for (key, value) in options {
124        params.insert(key.to_ascii_lowercase(), value);
125    }
126
127    let cfg = C::from_uri(&parsed, &params)?;
128    Ok(Operator::from_config(cfg)?.finish())
129}