opendal/services/mysql/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::str::FromStr;
20
21use sqlx::mysql::MySqlConnectOptions;
22use sqlx::MySqlPool;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MysqlConfig;
28use crate::*;
29
30impl Configurator for MysqlConfig {
31    type Builder = MysqlBuilder;
32    fn into_builder(self) -> Self::Builder {
33        MysqlBuilder { config: self }
34    }
35}
36
37#[doc = include_str!("docs.md")]
38#[derive(Default)]
39pub struct MysqlBuilder {
40    config: MysqlConfig,
41}
42
43impl Debug for MysqlBuilder {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        let mut d = f.debug_struct("MysqlBuilder");
46
47        d.field("config", &self.config).finish()
48    }
49}
50
51impl MysqlBuilder {
52    /// Set the connection_string of the mysql service.
53    ///
54    /// This connection string is used to connect to the mysql service. There are url based formats:
55    ///
56    /// ## Url
57    ///
58    /// This format resembles the url format of the mysql client. The format is: `[scheme://][user[:[password]]@]host[:port][/schema][?attribute1=value1&attribute2=value2...`
59    ///
60    /// - `mysql://user@localhost`
61    /// - `mysql://user:password@localhost`
62    /// - `mysql://user:password@localhost:3306`
63    /// - `mysql://user:password@localhost:3306/db`
64    ///
65    /// For more information, please refer to <https://docs.rs/sqlx/latest/sqlx/mysql/struct.MySqlConnectOptions.html>.
66    pub fn connection_string(mut self, v: &str) -> Self {
67        if !v.is_empty() {
68            self.config.connection_string = Some(v.to_string());
69        }
70        self
71    }
72
73    /// set the working directory, all operations will be performed under it.
74    ///
75    /// default: "/"
76    pub fn root(mut self, root: &str) -> Self {
77        self.config.root = if root.is_empty() {
78            None
79        } else {
80            Some(root.to_string())
81        };
82
83        self
84    }
85
86    /// Set the table name of the mysql service to read/write.
87    pub fn table(mut self, table: &str) -> Self {
88        if !table.is_empty() {
89            self.config.table = Some(table.to_string());
90        }
91        self
92    }
93
94    /// Set the key field name of the mysql service to read/write.
95    ///
96    /// Default to `key` if not specified.
97    pub fn key_field(mut self, key_field: &str) -> Self {
98        if !key_field.is_empty() {
99            self.config.key_field = Some(key_field.to_string());
100        }
101        self
102    }
103
104    /// Set the value field name of the mysql service to read/write.
105    ///
106    /// Default to `value` if not specified.
107    pub fn value_field(mut self, value_field: &str) -> Self {
108        if !value_field.is_empty() {
109            self.config.value_field = Some(value_field.to_string());
110        }
111        self
112    }
113}
114
115impl Builder for MysqlBuilder {
116    const SCHEME: Scheme = Scheme::Mysql;
117    type Config = MysqlConfig;
118
119    fn build(self) -> Result<impl Access> {
120        let conn = match self.config.connection_string {
121            Some(v) => v,
122            None => {
123                return Err(
124                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
125                        .with_context("service", Scheme::Mysql),
126                )
127            }
128        };
129
130        let config = MySqlConnectOptions::from_str(&conn).map_err(|err| {
131            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
132                .with_context("service", Scheme::Mysql)
133                .set_source(err)
134        })?;
135
136        let table = match self.config.table {
137            Some(v) => v,
138            None => {
139                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
140                    .with_context("service", Scheme::Mysql))
141            }
142        };
143
144        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
145
146        let value_field = self
147            .config
148            .value_field
149            .unwrap_or_else(|| "value".to_string());
150
151        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
152
153        Ok(MySqlBackend::new(Adapter {
154            pool: OnceCell::new(),
155            config,
156            table,
157            key_field,
158            value_field,
159        })
160        .with_normalized_root(root))
161    }
162}
163
164/// Backend for mysql service
165pub type MySqlBackend = kv::Backend<Adapter>;
166
167#[derive(Debug, Clone)]
168pub struct Adapter {
169    pool: OnceCell<MySqlPool>,
170    config: MySqlConnectOptions,
171
172    table: String,
173    key_field: String,
174    value_field: String,
175}
176
177impl Adapter {
178    async fn get_client(&self) -> Result<&MySqlPool> {
179        self.pool
180            .get_or_try_init(|| async {
181                let pool = MySqlPool::connect_with(self.config.clone())
182                    .await
183                    .map_err(parse_mysql_error)?;
184                Ok(pool)
185            })
186            .await
187    }
188}
189
190impl kv::Adapter for Adapter {
191    type Scanner = ();
192
193    fn info(&self) -> kv::Info {
194        kv::Info::new(
195            Scheme::Mysql,
196            &self.table,
197            Capability {
198                read: true,
199                write: true,
200                delete: true,
201                shared: true,
202                ..Default::default()
203            },
204        )
205    }
206
207    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
208        let pool = self.get_client().await?;
209
210        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
211            "SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
212            self.value_field, self.table, self.key_field
213        ))
214        .bind(path)
215        .fetch_optional(pool)
216        .await
217        .map_err(parse_mysql_error)?;
218
219        Ok(value.map(Buffer::from))
220    }
221
222    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
223        let pool = self.get_client().await?;
224
225        sqlx::query(&format!(
226            r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
227            ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
228            self.table, self.key_field, self.value_field, self.value_field, self.value_field
229        ))
230        .bind(path)
231        .bind(value.to_vec())
232        .execute(pool)
233        .await
234        .map_err(parse_mysql_error)?;
235
236        Ok(())
237    }
238
239    async fn delete(&self, path: &str) -> Result<()> {
240        let pool = self.get_client().await?;
241
242        sqlx::query(&format!(
243            "DELETE FROM `{}` WHERE `{}` = ?",
244            self.table, self.key_field
245        ))
246        .bind(path)
247        .execute(pool)
248        .await
249        .map_err(parse_mysql_error)?;
250
251        Ok(())
252    }
253}
254
255fn parse_mysql_error(err: sqlx::Error) -> Error {
256    Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
257}