opendal/types/operator/
registry.rs1use std::collections::HashMap;
19use std::sync::{LazyLock, Mutex};
20
21use http::Uri;
22use percent_encoding::percent_decode_str;
23
24use crate::services;
25use crate::types::builder::{Builder, Configurator};
26use crate::{Error, ErrorKind, Operator, Result};
27
28pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
30
31pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = LazyLock::new(|| {
33 let registry = OperatorRegistry::new();
34 register_builtin_services(®istry);
35 registry
36});
37
38#[derive(Debug, Default)]
40pub struct OperatorRegistry {
41 factories: Mutex<HashMap<String, OperatorFactory>>,
42}
43
44impl OperatorRegistry {
45 pub fn new() -> Self {
47 Self {
48 factories: Mutex::new(HashMap::new()),
49 }
50 }
51
52 pub fn register<B: Builder>(&self, scheme: &str) {
54 let key = scheme.to_ascii_lowercase();
55 let mut guard = self
56 .factories
57 .lock()
58 .expect("operator registry mutex poisoned");
59 guard.insert(key, factory::<B::Config>);
60 }
61
62 pub fn load(
64 &self,
65 uri: &str,
66 options: impl IntoIterator<Item = (String, String)>,
67 ) -> Result<Operator> {
68 let parsed = uri.parse::<Uri>().map_err(|err| {
69 Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err)
70 })?;
71
72 let scheme = parsed
73 .scheme_str()
74 .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is required"))?;
75
76 let key = scheme.to_ascii_lowercase();
77 let factory = self
78 .factories
79 .lock()
80 .expect("operator registry mutex poisoned")
81 .get(key.as_str())
82 .copied()
83 .ok_or_else(|| {
84 Error::new(ErrorKind::Unsupported, "scheme is not registered")
85 .with_context("scheme", scheme)
86 })?;
87
88 let opts: Vec<(String, String)> = options.into_iter().collect();
89 factory(uri, opts)
90 }
91}
92
93fn register_builtin_services(registry: &OperatorRegistry) {
94 #[cfg(feature = "services-memory")]
95 registry.register::<services::Memory>(services::MEMORY_SCHEME);
96 #[cfg(feature = "services-fs")]
97 registry.register::<services::Fs>(services::FS_SCHEME);
98 #[cfg(feature = "services-s3")]
99 registry.register::<services::S3>(services::S3_SCHEME);
100}
101
102fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) -> Result<Operator> {
104 let parsed = uri.parse::<Uri>().map_err(|err| {
105 Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err)
106 })?;
107
108 let mut params = HashMap::new();
109 if let Some(query) = parsed.query() {
110 for pair in query.split('&') {
111 if pair.is_empty() {
112 continue;
113 }
114 let mut parts = pair.splitn(2, '=');
115 let key = parts.next().unwrap_or("");
116 let value = parts.next().unwrap_or("");
117 let key = percent_decode_str(key).decode_utf8_lossy().to_string();
118 let value = percent_decode_str(value).decode_utf8_lossy().to_string();
119 params.insert(key.to_ascii_lowercase(), value);
120 }
121 }
122
123 for (key, value) in options {
124 params.insert(key.to_ascii_lowercase(), value);
125 }
126
127 let cfg = C::from_uri(&parsed, ¶ms)?;
128 Ok(Operator::from_config(cfg)?.finish())
129}