opendal/services/mysql/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use sqlx::mysql::MySqlConnectOptions;
22use tokio::sync::OnceCell;
23
24use super::MYSQL_SCHEME;
25use super::config::MysqlConfig;
26use super::core::*;
27use super::deleter::MysqlDeleter;
28use super::writer::MysqlWriter;
29use crate::raw::oio;
30use crate::raw::*;
31use crate::*;
32
33#[doc = include_str!("docs.md")]
34#[derive(Debug, Default)]
35pub struct MysqlBuilder {
36    pub(super) config: MysqlConfig,
37}
38
39impl MysqlBuilder {
40    /// Set the connection_string of the mysql service.
41    ///
42    /// This connection string is used to connect to the mysql service. There are url based formats:
43    ///
44    /// ## Url
45    ///
46    /// This format resembles the url format of the mysql client. The format is: `[scheme://][user[:[password]]@]host[:port][/schema][?attribute1=value1&attribute2=value2...`
47    ///
48    /// - `mysql://user@localhost`
49    /// - `mysql://user:password@localhost`
50    /// - `mysql://user:password@localhost:3306`
51    /// - `mysql://user:password@localhost:3306/db`
52    ///
53    /// For more information, please refer to <https://docs.rs/sqlx/latest/sqlx/mysql/struct.MySqlConnectOptions.html>.
54    pub fn connection_string(mut self, v: &str) -> Self {
55        if !v.is_empty() {
56            self.config.connection_string = Some(v.to_string());
57        }
58        self
59    }
60
61    /// set the working directory, all operations will be performed under it.
62    ///
63    /// default: "/"
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set the table name of the mysql service to read/write.
75    pub fn table(mut self, table: &str) -> Self {
76        if !table.is_empty() {
77            self.config.table = Some(table.to_string());
78        }
79        self
80    }
81
82    /// Set the key field name of the mysql service to read/write.
83    ///
84    /// Default to `key` if not specified.
85    pub fn key_field(mut self, key_field: &str) -> Self {
86        if !key_field.is_empty() {
87            self.config.key_field = Some(key_field.to_string());
88        }
89        self
90    }
91
92    /// Set the value field name of the mysql service to read/write.
93    ///
94    /// Default to `value` if not specified.
95    pub fn value_field(mut self, value_field: &str) -> Self {
96        if !value_field.is_empty() {
97            self.config.value_field = Some(value_field.to_string());
98        }
99        self
100    }
101}
102
103impl Builder for MysqlBuilder {
104    type Config = MysqlConfig;
105
106    fn build(self) -> Result<impl Access> {
107        let conn = match self.config.connection_string {
108            Some(v) => v,
109            None => {
110                return Err(
111                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
112                        .with_context("service", MYSQL_SCHEME),
113                );
114            }
115        };
116
117        let config = conn.parse::<MySqlConnectOptions>().map_err(|err| {
118            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
119                .with_context("service", MYSQL_SCHEME)
120                .set_source(err)
121        })?;
122
123        let table = match self.config.table {
124            Some(v) => v,
125            None => {
126                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
127                    .with_context("service", MYSQL_SCHEME));
128            }
129        };
130
131        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
132
133        let value_field = self
134            .config
135            .value_field
136            .unwrap_or_else(|| "value".to_string());
137
138        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
139
140        Ok(MysqlBackend::new(MysqlCore {
141            pool: OnceCell::new(),
142            config,
143            table,
144            key_field,
145            value_field,
146        })
147        .with_normalized_root(root))
148    }
149}
150
151/// Backend for mysql service
152#[derive(Clone, Debug)]
153pub struct MysqlBackend {
154    core: Arc<MysqlCore>,
155    root: String,
156    info: Arc<AccessorInfo>,
157}
158
159impl MysqlBackend {
160    pub fn new(core: MysqlCore) -> Self {
161        let info = AccessorInfo::default();
162        info.set_scheme(MYSQL_SCHEME);
163        info.set_name(&core.table);
164        info.set_root("/");
165        info.set_native_capability(Capability {
166            read: true,
167            stat: true,
168            write: true,
169            write_can_empty: true,
170            delete: true,
171            shared: true,
172            ..Default::default()
173        });
174
175        Self {
176            core: Arc::new(core),
177            root: "/".to_string(),
178            info: Arc::new(info),
179        }
180    }
181
182    fn with_normalized_root(mut self, root: String) -> Self {
183        self.info.set_root(&root);
184        self.root = root;
185        self
186    }
187}
188
189impl Access for MysqlBackend {
190    type Reader = Buffer;
191    type Writer = MysqlWriter;
192    type Lister = ();
193    type Deleter = oio::OneShotDeleter<MysqlDeleter>;
194
195    fn info(&self) -> Arc<AccessorInfo> {
196        self.info.clone()
197    }
198
199    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
200        let p = build_abs_path(&self.root, path);
201
202        if p == build_abs_path(&self.root, "") {
203            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
204        } else {
205            let bs = self.core.get(&p).await?;
206            match bs {
207                Some(bs) => Ok(RpStat::new(
208                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
209                )),
210                None => Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
211            }
212        }
213    }
214
215    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
216        let p = build_abs_path(&self.root, path);
217        let bs = match self.core.get(&p).await? {
218            Some(bs) => bs,
219            None => return Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
220        };
221        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
222    }
223
224    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
225        let p = build_abs_path(&self.root, path);
226        Ok((RpWrite::new(), MysqlWriter::new(self.core.clone(), p)))
227    }
228
229    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
230        Ok((
231            RpDelete::default(),
232            oio::OneShotDeleter::new(MysqlDeleter::new(self.core.clone(), self.root.clone())),
233        ))
234    }
235}