use std::collections::HashMap;
use std::fmt::Debug;
use mysql_async::prelude::*;
use mysql_async::Opts;
use mysql_async::Pool;
use serde::Deserialize;
use crate::raw::adapters::kv;
use crate::raw::*;
use crate::*;
#[derive(Default, Deserialize)]
#[serde(default)]
#[non_exhaustive]
pub struct MysqlConfig {
connection_string: Option<String>,
table: Option<String>,
key_field: Option<String>,
value_field: Option<String>,
root: Option<String>,
}
impl Debug for MysqlConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("MysqlConfig");
if self.connection_string.is_some() {
d.field("connection_string", &"<redacted>");
}
d.field("root", &self.root)
.field("table", &self.table)
.field("key_field", &self.key_field)
.field("value_field", &self.value_field)
.finish()
}
}
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct MysqlBuilder {
config: MysqlConfig,
}
impl Debug for MysqlBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("MysqlBuilder");
d.field("config", &self.config).finish()
}
}
impl MysqlBuilder {
pub fn connection_string(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
self.config.connection_string = Some(v.to_string());
}
self
}
pub fn root(&mut self, root: &str) -> &mut Self {
if !root.is_empty() {
self.config.root = Some(root.to_string());
}
self
}
pub fn table(&mut self, table: &str) -> &mut Self {
if !table.is_empty() {
self.config.table = Some(table.to_string());
}
self
}
pub fn key_field(&mut self, key_field: &str) -> &mut Self {
if !key_field.is_empty() {
self.config.key_field = Some(key_field.to_string());
}
self
}
pub fn value_field(&mut self, value_field: &str) -> &mut Self {
if !value_field.is_empty() {
self.config.value_field = Some(value_field.to_string());
}
self
}
}
impl Builder for MysqlBuilder {
const SCHEME: Scheme = Scheme::Mysql;
type Accessor = MySqlBackend;
fn from_map(map: HashMap<String, String>) -> Self {
let config = MysqlConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");
MysqlBuilder { config }
}
fn build(&mut self) -> Result<Self::Accessor> {
let conn = match self.config.connection_string.clone() {
Some(v) => v,
None => {
return Err(
Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
.with_context("service", Scheme::Mysql),
)
}
};
let config = Opts::from_url(&conn).map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
.with_context("service", Scheme::Mysql)
.set_source(err)
})?;
let table = match self.config.table.clone() {
Some(v) => v,
None => {
return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
.with_context("service", Scheme::Mysql))
}
};
let key_field = match self.config.key_field.clone() {
Some(v) => v,
None => "key".to_string(),
};
let value_field = match self.config.value_field.clone() {
Some(v) => v,
None => "value".to_string(),
};
let root = normalize_root(
self.config
.root
.clone()
.unwrap_or_else(|| "/".to_string())
.as_str(),
);
let pool = Pool::new(config.clone());
Ok(MySqlBackend::new(Adapter {
connection_pool: pool,
config,
table,
key_field,
value_field,
})
.with_root(&root))
}
}
pub type MySqlBackend = kv::Backend<Adapter>;
#[derive(Clone)]
pub struct Adapter {
connection_pool: Pool,
config: Opts,
table: String,
key_field: String,
value_field: String,
}
impl Debug for Adapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Adapter")
.field("connection_pool", &self.connection_pool)
.field("config", &self.config)
.field("table", &self.table)
.field("key_field", &self.key_field)
.field("value_field", &self.value_field)
.finish()
}
}
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Mysql,
&self.table,
Capability {
read: true,
write: true,
..Default::default()
},
)
}
async fn get(&self, path: &str) -> Result<Option<Buffer>> {
let query = format!(
"SELECT `{}` FROM `{}` WHERE `{}` = :path LIMIT 1",
self.value_field, self.table, self.key_field
);
let mut conn = self
.connection_pool
.get_conn()
.await
.map_err(parse_mysql_error)?;
let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
let result: Option<Vec<u8>> = conn
.exec_first(
statement,
params! {
"path" => path,
},
)
.await
.map_err(parse_mysql_error)?;
match result {
Some(v) => Ok(Some(Buffer::from(v))),
None => Ok(None),
}
}
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let query = format!(
"INSERT INTO `{}` (`{}`, `{}`)
VALUES (:path, :value)
ON DUPLICATE KEY UPDATE `{}` = VALUES({})",
self.table, self.key_field, self.value_field, self.value_field, self.value_field
);
let mut conn = self
.connection_pool
.get_conn()
.await
.map_err(parse_mysql_error)?;
let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
conn.exec_drop(
statement,
params! {
"path" => path,
"value" => value.to_vec(),
},
)
.await
.map_err(parse_mysql_error)?;
Ok(())
}
async fn delete(&self, path: &str) -> Result<()> {
let query = format!(
"DELETE FROM `{}` WHERE `{}` = :path",
self.table, self.key_field
);
let mut conn = self
.connection_pool
.get_conn()
.await
.map_err(parse_mysql_error)?;
let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
conn.exec_drop(
statement,
params! {
"path" => path,
},
)
.await
.map_err(parse_mysql_error)?;
Ok(())
}
}
fn parse_mysql_error(err: mysql_async::Error) -> Error {
Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
}