opendal/services/postgresql/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str::FromStr;
21
22use sqlx::postgres::PgConnectOptions;
23use sqlx::PgPool;
24use tokio::sync::OnceCell;
25
26use crate::raw::adapters::kv;
27use crate::raw::*;
28use crate::services::PostgresqlConfig;
29use crate::*;
30
31impl Configurator for PostgresqlConfig {
32 type Builder = PostgresqlBuilder;
33 fn into_builder(self) -> Self::Builder {
34 PostgresqlBuilder { config: self }
35 }
36}
37
38#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct PostgresqlBuilder {
42 config: PostgresqlConfig,
43}
44
45impl Debug for PostgresqlBuilder {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 let mut d = f.debug_struct("PostgresqlBuilder");
48
49 d.field("config", &self.config);
50 d.finish()
51 }
52}
53
54impl PostgresqlBuilder {
55 pub fn connection_string(mut self, v: &str) -> Self {
66 if !v.is_empty() {
67 self.config.connection_string = Some(v.to_string());
68 }
69 self
70 }
71
72 pub fn root(mut self, root: &str) -> Self {
76 self.config.root = if root.is_empty() {
77 None
78 } else {
79 Some(root.to_string())
80 };
81
82 self
83 }
84
85 pub fn table(mut self, table: &str) -> Self {
87 if !table.is_empty() {
88 self.config.table = Some(table.to_string());
89 }
90 self
91 }
92
93 pub fn key_field(mut self, key_field: &str) -> Self {
97 if !key_field.is_empty() {
98 self.config.key_field = Some(key_field.to_string());
99 }
100 self
101 }
102
103 pub fn value_field(mut self, value_field: &str) -> Self {
107 if !value_field.is_empty() {
108 self.config.value_field = Some(value_field.to_string());
109 }
110 self
111 }
112}
113
114impl Builder for PostgresqlBuilder {
115 const SCHEME: Scheme = Scheme::Postgresql;
116 type Config = PostgresqlConfig;
117
118 fn build(self) -> Result<impl Access> {
119 let conn = match self.config.connection_string {
120 Some(v) => v,
121 None => {
122 return Err(
123 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
124 .with_context("service", Scheme::Postgresql),
125 )
126 }
127 };
128
129 let config = PgConnectOptions::from_str(&conn).map_err(|err| {
130 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
131 .with_context("service", Scheme::Postgresql)
132 .set_source(err)
133 })?;
134
135 let table = match self.config.table {
136 Some(v) => v,
137 None => {
138 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
139 .with_context("service", Scheme::Postgresql))
140 }
141 };
142
143 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
144
145 let value_field = self
146 .config
147 .value_field
148 .unwrap_or_else(|| "value".to_string());
149
150 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
151
152 Ok(PostgresqlBackend::new(Adapter {
153 pool: OnceCell::new(),
154 config,
155 table,
156 key_field,
157 value_field,
158 })
159 .with_normalized_root(root))
160 }
161}
162
163pub type PostgresqlBackend = kv::Backend<Adapter>;
165
166#[derive(Debug, Clone)]
167pub struct Adapter {
168 pool: OnceCell<PgPool>,
169 config: PgConnectOptions,
170
171 table: String,
172 key_field: String,
173 value_field: String,
174}
175
176impl Adapter {
177 async fn get_client(&self) -> Result<&PgPool> {
178 self.pool
179 .get_or_try_init(|| async {
180 let pool = PgPool::connect_with(self.config.clone())
181 .await
182 .map_err(parse_postgres_error)?;
183 Ok(pool)
184 })
185 .await
186 }
187}
188
189impl kv::Adapter for Adapter {
190 type Scanner = ();
191
192 fn info(&self) -> kv::Info {
193 kv::Info::new(
194 Scheme::Postgresql,
195 &self.table,
196 Capability {
197 read: true,
198 write: true,
199 shared: true,
200 ..Default::default()
201 },
202 )
203 }
204
205 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
206 let pool = self.get_client().await?;
207
208 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
209 r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#,
210 self.value_field, self.table, self.key_field
211 ))
212 .bind(path)
213 .fetch_optional(pool)
214 .await
215 .map_err(parse_postgres_error)?;
216
217 Ok(value.map(Buffer::from))
218 }
219
220 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
221 let pool = self.get_client().await?;
222
223 let table = &self.table;
224 let key_field = &self.key_field;
225 let value_field = &self.value_field;
226 sqlx::query(&format!(
227 r#"INSERT INTO "{table}" ("{key_field}", "{value_field}")
228 VALUES ($1, $2)
229 ON CONFLICT ("{key_field}")
230 DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#,
231 ))
232 .bind(path)
233 .bind(value.to_vec())
234 .execute(pool)
235 .await
236 .map_err(parse_postgres_error)?;
237
238 Ok(())
239 }
240
241 async fn delete(&self, path: &str) -> Result<()> {
242 let pool = self.get_client().await?;
243
244 sqlx::query(&format!(
245 "DELETE FROM {} WHERE {} = $1",
246 self.table, self.key_field
247 ))
248 .bind(path)
249 .execute(pool)
250 .await
251 .map_err(parse_postgres_error)?;
252
253 Ok(())
254 }
255}
256
257fn parse_postgres_error(err: sqlx::Error) -> Error {
258 Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
259}