opendal/services/mysql/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use sqlx::mysql::MySqlConnectOptions;
22use tokio::sync::OnceCell;
23
24use super::config::MysqlConfig;
25use super::core::*;
26use super::deleter::MysqlDeleter;
27use super::writer::MysqlWriter;
28use crate::raw::oio;
29use crate::raw::*;
30use crate::*;
31
32#[doc = include_str!("docs.md")]
33#[derive(Default)]
34pub struct MysqlBuilder {
35 pub(super) config: MysqlConfig,
36}
37
38impl Debug for MysqlBuilder {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 let mut d = f.debug_struct("MysqlBuilder");
41
42 d.field("config", &self.config).finish()
43 }
44}
45
46impl MysqlBuilder {
47 pub fn connection_string(mut self, v: &str) -> Self {
62 if !v.is_empty() {
63 self.config.connection_string = Some(v.to_string());
64 }
65 self
66 }
67
68 pub fn root(mut self, root: &str) -> Self {
72 self.config.root = if root.is_empty() {
73 None
74 } else {
75 Some(root.to_string())
76 };
77
78 self
79 }
80
81 pub fn table(mut self, table: &str) -> Self {
83 if !table.is_empty() {
84 self.config.table = Some(table.to_string());
85 }
86 self
87 }
88
89 pub fn key_field(mut self, key_field: &str) -> Self {
93 if !key_field.is_empty() {
94 self.config.key_field = Some(key_field.to_string());
95 }
96 self
97 }
98
99 pub fn value_field(mut self, value_field: &str) -> Self {
103 if !value_field.is_empty() {
104 self.config.value_field = Some(value_field.to_string());
105 }
106 self
107 }
108}
109
110impl Builder for MysqlBuilder {
111 type Config = MysqlConfig;
112
113 fn build(self) -> Result<impl Access> {
114 let conn = match self.config.connection_string {
115 Some(v) => v,
116 None => {
117 return Err(
118 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
119 .with_context("service", Scheme::Mysql),
120 );
121 }
122 };
123
124 let config = conn.parse::<MySqlConnectOptions>().map_err(|err| {
125 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
126 .with_context("service", Scheme::Mysql)
127 .set_source(err)
128 })?;
129
130 let table = match self.config.table {
131 Some(v) => v,
132 None => {
133 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
134 .with_context("service", Scheme::Mysql));
135 }
136 };
137
138 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
139
140 let value_field = self
141 .config
142 .value_field
143 .unwrap_or_else(|| "value".to_string());
144
145 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
146
147 Ok(MysqlBackend::new(MysqlCore {
148 pool: OnceCell::new(),
149 config,
150 table,
151 key_field,
152 value_field,
153 })
154 .with_normalized_root(root))
155 }
156}
157
158#[derive(Clone, Debug)]
160pub struct MysqlBackend {
161 core: Arc<MysqlCore>,
162 root: String,
163 info: Arc<AccessorInfo>,
164}
165
166impl MysqlBackend {
167 pub fn new(core: MysqlCore) -> Self {
168 let info = AccessorInfo::default();
169 info.set_scheme(Scheme::Mysql.into_static());
170 info.set_name(&core.table);
171 info.set_root("/");
172 info.set_native_capability(Capability {
173 read: true,
174 stat: true,
175 write: true,
176 write_can_empty: true,
177 delete: true,
178 shared: true,
179 ..Default::default()
180 });
181
182 Self {
183 core: Arc::new(core),
184 root: "/".to_string(),
185 info: Arc::new(info),
186 }
187 }
188
189 fn with_normalized_root(mut self, root: String) -> Self {
190 self.info.set_root(&root);
191 self.root = root;
192 self
193 }
194}
195
196impl Access for MysqlBackend {
197 type Reader = Buffer;
198 type Writer = MysqlWriter;
199 type Lister = ();
200 type Deleter = oio::OneShotDeleter<MysqlDeleter>;
201
202 fn info(&self) -> Arc<AccessorInfo> {
203 self.info.clone()
204 }
205
206 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
207 let p = build_abs_path(&self.root, path);
208
209 if p == build_abs_path(&self.root, "") {
210 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
211 } else {
212 let bs = self.core.get(&p).await?;
213 match bs {
214 Some(bs) => Ok(RpStat::new(
215 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
216 )),
217 None => Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
218 }
219 }
220 }
221
222 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
223 let p = build_abs_path(&self.root, path);
224 let bs = match self.core.get(&p).await? {
225 Some(bs) => bs,
226 None => return Err(Error::new(ErrorKind::NotFound, "kv not found in mysql")),
227 };
228 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
229 }
230
231 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
232 let p = build_abs_path(&self.root, path);
233 Ok((RpWrite::new(), MysqlWriter::new(self.core.clone(), p)))
234 }
235
236 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
237 Ok((
238 RpDelete::default(),
239 oio::OneShotDeleter::new(MysqlDeleter::new(self.core.clone(), self.root.clone())),
240 ))
241 }
242
243 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
244 let _ = build_abs_path(&self.root, path);
245 Ok((RpList::default(), ()))
246 }
247}