opendal_core/services/mysql/
core.rs1use mea::once::OnceCell;
19use sqlx::MySqlPool;
20use sqlx::mysql::MySqlConnectOptions;
21
22use crate::*;
23
24#[derive(Clone, Debug)]
25pub struct MysqlCore {
26 pub pool: OnceCell<MySqlPool>,
27 pub config: MySqlConnectOptions,
28
29 pub table: String,
30 pub key_field: String,
31 pub value_field: String,
32}
33
34impl MysqlCore {
35 async fn get_client(&self) -> Result<&MySqlPool> {
36 self.pool
37 .get_or_try_init(|| async {
38 MySqlPool::connect_with(self.config.clone())
39 .await
40 .map_err(parse_mysql_error)
41 })
42 .await
43 }
44
45 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
46 let pool = self.get_client().await?;
47
48 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
49 "SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
50 self.value_field, self.table, self.key_field
51 ))
52 .bind(path)
53 .fetch_optional(pool)
54 .await
55 .map_err(parse_mysql_error)?;
56
57 Ok(value.map(Buffer::from))
58 }
59
60 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
61 let pool = self.get_client().await?;
62
63 sqlx::query(&format!(
64 r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
65 ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
66 self.table, self.key_field, self.value_field, self.value_field, self.value_field
67 ))
68 .bind(path)
69 .bind(value.to_vec())
70 .execute(pool)
71 .await
72 .map_err(parse_mysql_error)?;
73
74 Ok(())
75 }
76
77 pub async fn delete(&self, path: &str) -> Result<()> {
78 let pool = self.get_client().await?;
79
80 sqlx::query(&format!(
81 "DELETE FROM `{}` WHERE `{}` = ?",
82 self.table, self.key_field
83 ))
84 .bind(path)
85 .execute(pool)
86 .await
87 .map_err(parse_mysql_error)?;
88
89 Ok(())
90 }
91}
92
93fn parse_mysql_error(err: sqlx::Error) -> Error {
94 Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
95}