opendal/services/mysql/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::str::FromStr;
20
21use sqlx::mysql::MySqlConnectOptions;
22use sqlx::MySqlPool;
23use tokio::sync::OnceCell;
24
25use crate::raw::adapters::kv;
26use crate::raw::*;
27use crate::services::MysqlConfig;
28use crate::*;
29
30impl Configurator for MysqlConfig {
31    type Builder = MysqlBuilder;
32    fn into_builder(self) -> Self::Builder {
33        MysqlBuilder { config: self }
34    }
35}
36
37#[doc = include_str!("docs.md")]
38#[derive(Default)]
39pub struct MysqlBuilder {
40    config: MysqlConfig,
41}
42
43impl Debug for MysqlBuilder {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        let mut d = f.debug_struct("MysqlBuilder");
46
47        d.field("config", &self.config).finish()
48    }
49}
50
51impl MysqlBuilder {
52    /// Set the connection_string of the mysql service.
53    ///
54    /// This connection string is used to connect to the mysql service. There are url based formats:
55    ///
56    /// ## Url
57    ///
58    /// This format resembles the url format of the mysql client. The format is: `[scheme://][user[:[password]]@]host[:port][/schema][?attribute1=value1&attribute2=value2...`
59    ///
60    /// - `mysql://user@localhost`
61    /// - `mysql://user:password@localhost`
62    /// - `mysql://user:password@localhost:3306`
63    /// - `mysql://user:password@localhost:3306/db`
64    ///
65    /// For more information, please refer to <https://docs.rs/sqlx/latest/sqlx/mysql/struct.MySqlConnectOptions.html>.
66    pub fn connection_string(mut self, v: &str) -> Self {
67        if !v.is_empty() {
68            self.config.connection_string = Some(v.to_string());
69        }
70        self
71    }
72
73    /// set the working directory, all operations will be performed under it.
74    ///
75    /// default: "/"
76    pub fn root(mut self, root: &str) -> Self {
77        self.config.root = if root.is_empty() {
78            None
79        } else {
80            Some(root.to_string())
81        };
82
83        self
84    }
85
86    /// Set the table name of the mysql service to read/write.
87    pub fn table(mut self, table: &str) -> Self {
88        if !table.is_empty() {
89            self.config.table = Some(table.to_string());
90        }
91        self
92    }
93
94    /// Set the key field name of the mysql service to read/write.
95    ///
96    /// Default to `key` if not specified.
97    pub fn key_field(mut self, key_field: &str) -> Self {
98        if !key_field.is_empty() {
99            self.config.key_field = Some(key_field.to_string());
100        }
101        self
102    }
103
104    /// Set the value field name of the mysql service to read/write.
105    ///
106    /// Default to `value` if not specified.
107    pub fn value_field(mut self, value_field: &str) -> Self {
108        if !value_field.is_empty() {
109            self.config.value_field = Some(value_field.to_string());
110        }
111        self
112    }
113}
114
115impl Builder for MysqlBuilder {
116    type Config = MysqlConfig;
117
118    fn build(self) -> Result<impl Access> {
119        let conn = match self.config.connection_string {
120            Some(v) => v,
121            None => {
122                return Err(
123                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
124                        .with_context("service", Scheme::Mysql),
125                )
126            }
127        };
128
129        let config = MySqlConnectOptions::from_str(&conn).map_err(|err| {
130            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
131                .with_context("service", Scheme::Mysql)
132                .set_source(err)
133        })?;
134
135        let table = match self.config.table {
136            Some(v) => v,
137            None => {
138                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
139                    .with_context("service", Scheme::Mysql))
140            }
141        };
142
143        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
144
145        let value_field = self
146            .config
147            .value_field
148            .unwrap_or_else(|| "value".to_string());
149
150        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
151
152        Ok(MySqlBackend::new(Adapter {
153            pool: OnceCell::new(),
154            config,
155            table,
156            key_field,
157            value_field,
158        })
159        .with_normalized_root(root))
160    }
161}
162
163/// Backend for mysql service
164pub type MySqlBackend = kv::Backend<Adapter>;
165
166#[derive(Debug, Clone)]
167pub struct Adapter {
168    pool: OnceCell<MySqlPool>,
169    config: MySqlConnectOptions,
170
171    table: String,
172    key_field: String,
173    value_field: String,
174}
175
176impl Adapter {
177    async fn get_client(&self) -> Result<&MySqlPool> {
178        self.pool
179            .get_or_try_init(|| async {
180                let pool = MySqlPool::connect_with(self.config.clone())
181                    .await
182                    .map_err(parse_mysql_error)?;
183                Ok(pool)
184            })
185            .await
186    }
187}
188
189impl kv::Adapter for Adapter {
190    type Scanner = ();
191
192    fn info(&self) -> kv::Info {
193        kv::Info::new(
194            Scheme::Mysql,
195            &self.table,
196            Capability {
197                read: true,
198                write: true,
199                delete: true,
200                shared: true,
201                ..Default::default()
202            },
203        )
204    }
205
206    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
207        let pool = self.get_client().await?;
208
209        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
210            "SELECT `{}` FROM `{}` WHERE `{}` = ? LIMIT 1",
211            self.value_field, self.table, self.key_field
212        ))
213        .bind(path)
214        .fetch_optional(pool)
215        .await
216        .map_err(parse_mysql_error)?;
217
218        Ok(value.map(Buffer::from))
219    }
220
221    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
222        let pool = self.get_client().await?;
223
224        sqlx::query(&format!(
225            r#"INSERT INTO `{}` (`{}`, `{}`) VALUES (?, ?)
226            ON DUPLICATE KEY UPDATE `{}` = VALUES({})"#,
227            self.table, self.key_field, self.value_field, self.value_field, self.value_field
228        ))
229        .bind(path)
230        .bind(value.to_vec())
231        .execute(pool)
232        .await
233        .map_err(parse_mysql_error)?;
234
235        Ok(())
236    }
237
238    async fn delete(&self, path: &str) -> Result<()> {
239        let pool = self.get_client().await?;
240
241        sqlx::query(&format!(
242            "DELETE FROM `{}` WHERE `{}` = ?",
243            self.table, self.key_field
244        ))
245        .bind(path)
246        .execute(pool)
247        .await
248        .map_err(parse_mysql_error)?;
249
250        Ok(())
251    }
252}
253
254fn parse_mysql_error(err: sqlx::Error) -> Error {
255    Error::new(ErrorKind::Unexpected, "unhandled error from mysql").set_source(err)
256}