opendal/services/mysql/
backend.rs1use std::fmt::Debug;
19use std::str::FromStr;
20
21use sqlx::mysql::MySqlConnectOptions;
22use sqlx::MySqlPool;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MysqlConfig;
28use crate::*;
29
30impl Configurator for MysqlConfig {
31 type Builder = MysqlBuilder;
32 fn into_builder(self) -> Self::Builder {
33 MysqlBuilder { config: self }
34 }
35}
36
37#[doc = include_str!("docs.md")]
38#[derive(Default)]
39pub struct MysqlBuilder {
40 config: MysqlConfig,
41}
42
43impl Debug for MysqlBuilder {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 let mut d = f.debug_struct("MysqlBuilder");
46
47 d.field("config", &self.config).finish()
48 }
49}
50
51impl MysqlBuilder {
52 pub fn connection_string(mut self, v: &str) -> Self {
67 if !v.is_empty() {
68 self.config.connection_string = Some(v.to_string());
69 }
70 self
71 }
72
73 pub fn root(mut self, root: &str) -> Self {
77 self.config.root = if root.is_empty() {
78 None
79 } else {
80 Some(root.to_string())
81 };
82
83 self
84 }
85
86 pub fn table(mut self, table: &str) -> Self {
88 if !table.is_empty() {
89 self.config.table = Some(table.to_string());
90 }
91 self
92 }
93
94 pub fn key_field(mut self, key_field: &str) -> Self {
98 if !key_field.is_empty() {
99 self.config.key_field = Some(key_field.to_string());
100 }
101 self
102 }
103
104 pub fn value_field(mut self, value_field: &str) -> Self {
108 if !value_field.is_empty() {
109 self.config.value_field = Some(value_field.to_string());
110 }
111 self
112 }
113}
114
115impl Builder for MysqlBuilder {
116 const SCHEME: Scheme = Scheme::Mysql;
117 type Config = MysqlConfig;
118
119 fn build(self) -> Result<impl Access> {
120 let conn = match self.config.connection_string {
121 Some(v) => v,
122 None => {
123 return Err(
124 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
125 .with_context("service", Scheme::Mysql),
126 )
127 }
128 };
129
130 let config = MySqlConnectOptions::from_str(&conn).map_err(|err| {
131 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
132 .with_context("service", Scheme::Mysql)
133 .set_source(err)
134 })?;
135
136 let table = match self.config.table {
137 Some(v) => v,
138 None => {
139 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
140 .with_context("service", Scheme::Mysql))
141 }
142 };
143
144 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
145
146 let value_field = self
147 .config
148 .value_field
149 .unwrap_or_else(|| "value".to_string());
150
151 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
152
153 Ok(MySqlBackend::new(Adapter {
154 pool: OnceCell::new(),
155 config,
156 table,
157 key_field,
158 value_field,
159 })
160 .with_normalized_root(root))
161 }
162}
163
164pub type MySqlBackend = kv::Backend<Adapter>;
166
167#[derive(Debug, Clone)]
168pub struct Adapter {
169 pool: OnceCell<MySqlPool>,
170 config: MySqlConnectOptions,
171
172 table: String,
173 key_field: String,
174 value_field: String,
175}
176
177impl Adapter {
178 async fn get_client(&self) -> Result<&MySqlPool> {
179 self.pool
180 .get_or_try_init(|| async {
181 let pool = MySqlPool::connect_with(self.config.clone())
182 .await
183 .map_err(parse_mysql_error)?;
184 Ok(pool)
185 })
186 .await
187 }
188}
189
190impl kv::Adapter for Adapter {
191 type Scanner = ();
192
193 fn info(&self) -> kv::Info {
194 kv::Info::new(
195 Scheme::Mysql,
196 &self.table,
197 Capability {
198 read: true,
199 write: true,
200 delete: true,
201 shared: true,
202 ..Default::default()
203 },
204 )
205 }
206
207 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
208 let pool = self.get_client().await?;
209
210 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
211 "SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
212 self.value_field, self.table, self.key_field
213 ))
214 .bind(path)
215 .fetch_optional(pool)
216 .await
217 .map_err(parse_mysql_error)?;
218
219 Ok(value.map(Buffer::from))
220 }
221
222 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
223 let pool = self.get_client().await?;
224
225 sqlx::query(&format!(
226 r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
227 ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
228 self.table, self.key_field, self.value_field, self.value_field, self.value_field
229 ))
230 .bind(path)
231 .bind(value.to_vec())
232 .execute(pool)
233 .await
234 .map_err(parse_mysql_error)?;
235
236 Ok(())
237 }
238
239 async fn delete(&self, path: &str) -> Result<()> {
240 let pool = self.get_client().await?;
241
242 sqlx::query(&format!(
243 "DELETE FROM `{}` WHERE `{}` = ?",
244 self.table, self.key_field
245 ))
246 .bind(path)
247 .execute(pool)
248 .await
249 .map_err(parse_mysql_error)?;
250
251 Ok(())
252 }
253}
254
255fn parse_mysql_error(err: sqlx::Error) -> Error {
256 Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
257}