opendal_core/services/mysql/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use mea::once::OnceCell;
19use sqlx::MySqlPool;
20use sqlx::mysql::MySqlConnectOptions;
21
22use crate::*;
23
24#[derive(Clone, Debug)]
25pub struct MysqlCore {
26    pub pool: OnceCell<MySqlPool>,
27    pub config: MySqlConnectOptions,
28
29    pub table: String,
30    pub key_field: String,
31    pub value_field: String,
32}
33
34impl MysqlCore {
35    async fn get_client(&self) -> Result<&MySqlPool> {
36        self.pool
37            .get_or_try_init(|| async {
38                MySqlPool::connect_with(self.config.clone())
39                    .await
40                    .map_err(parse_mysql_error)
41            })
42            .await
43    }
44
45    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
46        let pool = self.get_client().await?;
47
48        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
49            "SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
50            self.value_field, self.table, self.key_field
51        ))
52        .bind(path)
53        .fetch_optional(pool)
54        .await
55        .map_err(parse_mysql_error)?;
56
57        Ok(value.map(Buffer::from))
58    }
59
60    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
61        let pool = self.get_client().await?;
62
63        sqlx::query(&format!(
64            r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
65            ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
66            self.table, self.key_field, self.value_field, self.value_field, self.value_field
67        ))
68        .bind(path)
69        .bind(value.to_vec())
70        .execute(pool)
71        .await
72        .map_err(parse_mysql_error)?;
73
74        Ok(())
75    }
76
77    pub async fn delete(&self, path: &str) -> Result<()> {
78        let pool = self.get_client().await?;
79
80        sqlx::query(&format!(
81            "DELETE FROM `{}` WHERE `{}` = ?",
82            self.table, self.key_field
83        ))
84        .bind(path)
85        .execute(pool)
86        .await
87        .map_err(parse_mysql_error)?;
88
89        Ok(())
90    }
91}
92
93fn parse_mysql_error(err: sqlx::Error) -> Error {
94    Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
95}