opendal/services/mysql/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use sqlx::mysql::MySqlConnectOptions;
22use tokio::sync::OnceCell;
23
24use super::MYSQL_SCHEME;
25use super::config::MysqlConfig;
26use super::core::*;
27use super::deleter::MysqlDeleter;
28use super::writer::MysqlWriter;
29use crate::raw::oio;
30use crate::raw::*;
31use crate::*;
32
33#[doc = include_str!("docs.md")]
34#[derive(Debug, Default)]
35pub struct MysqlBuilder {
36 pub(super) config: MysqlConfig,
37}
38
39impl MysqlBuilder {
40 pub fn connection_string(mut self, v: &str) -> Self {
55 if !v.is_empty() {
56 self.config.connection_string = Some(v.to_string());
57 }
58 self
59 }
60
61 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn table(mut self, table: &str) -> Self {
76 if !table.is_empty() {
77 self.config.table = Some(table.to_string());
78 }
79 self
80 }
81
82 pub fn key_field(mut self, key_field: &str) -> Self {
86 if !key_field.is_empty() {
87 self.config.key_field = Some(key_field.to_string());
88 }
89 self
90 }
91
92 pub fn value_field(mut self, value_field: &str) -> Self {
96 if !value_field.is_empty() {
97 self.config.value_field = Some(value_field.to_string());
98 }
99 self
100 }
101}
102
103impl Builder for MysqlBuilder {
104 type Config = MysqlConfig;
105
106 fn build(self) -> Result<impl Access> {
107 let conn = match self.config.connection_string {
108 Some(v) => v,
109 None => {
110 return Err(
111 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
112 .with_context("service", MYSQL_SCHEME),
113 );
114 }
115 };
116
117 let config = conn.parse::<MySqlConnectOptions>().map_err(|err| {
118 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
119 .with_context("service", MYSQL_SCHEME)
120 .set_source(err)
121 })?;
122
123 let table = match self.config.table {
124 Some(v) => v,
125 None => {
126 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
127 .with_context("service", MYSQL_SCHEME));
128 }
129 };
130
131 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
132
133 let value_field = self
134 .config
135 .value_field
136 .unwrap_or_else(|| "value".to_string());
137
138 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
139
140 Ok(MysqlBackend::new(MysqlCore {
141 pool: OnceCell::new(),
142 config,
143 table,
144 key_field,
145 value_field,
146 })
147 .with_normalized_root(root))
148 }
149}
150
151#[derive(Clone, Debug)]
153pub struct MysqlBackend {
154 core: Arc<MysqlCore>,
155 root: String,
156 info: Arc<AccessorInfo>,
157}
158
159impl MysqlBackend {
160 pub fn new(core: MysqlCore) -> Self {
161 let info = AccessorInfo::default();
162 info.set_scheme(MYSQL_SCHEME);
163 info.set_name(&core.table);
164 info.set_root("/");
165 info.set_native_capability(Capability {
166 read: true,
167 stat: true,
168 write: true,
169 write_can_empty: true,
170 delete: true,
171 shared: true,
172 ..Default::default()
173 });
174
175 Self {
176 core: Arc::new(core),
177 root: "/".to_string(),
178 info: Arc::new(info),
179 }
180 }
181
182 fn with_normalized_root(mut self, root: String) -> Self {
183 self.info.set_root(&root);
184 self.root = root;
185 self
186 }
187}
188
189impl Access for MysqlBackend {
190 type Reader = Buffer;
191 type Writer = MysqlWriter;
192 type Lister = ();
193 type Deleter = oio::OneShotDeleter<MysqlDeleter>;
194
195 fn info(&self) -> Arc<AccessorInfo> {
196 self.info.clone()
197 }
198
199 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
200 let p = build_abs_path(&self.root, path);
201
202 if p == build_abs_path(&self.root, "") {
203 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
204 } else {
205 let bs = self.core.get(&p).await?;
206 match bs {
207 Some(bs) => Ok(RpStat::new(
208 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
209 )),
210 None => Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
211 }
212 }
213 }
214
215 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
216 let p = build_abs_path(&self.root, path);
217 let bs = match self.core.get(&p).await? {
218 Some(bs) => bs,
219 None => return Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
220 };
221 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
222 }
223
224 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
225 let p = build_abs_path(&self.root, path);
226 Ok((RpWrite::new(), MysqlWriter::new(self.core.clone(), p)))
227 }
228
229 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
230 Ok((
231 RpDelete::default(),
232 oio::OneShotDeleter::new(MysqlDeleter::new(self.core.clone(), self.root.clone())),
233 ))
234 }
235}