opendal_core/services/postgresql/
core.rs1use mea::once::OnceCell;
19use sqlx::PgPool;
20use sqlx::postgres::PgConnectOptions;
21
22use crate::*;
23
24#[derive(Clone, Debug)]
25pub struct PostgresqlCore {
26 pub pool: OnceCell<PgPool>,
27 pub config: PgConnectOptions,
28
29 pub table: String,
30 pub key_field: String,
31 pub value_field: String,
32}
33
34impl PostgresqlCore {
35 async fn get_client(&self) -> Result<&PgPool> {
36 self.pool
37 .get_or_try_init(|| async {
38 PgPool::connect_with(self.config.clone())
39 .await
40 .map_err(parse_postgres_error)
41 })
42 .await
43 }
44
45 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
46 let pool = self.get_client().await?;
47
48 let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
49 r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#,
50 self.value_field, self.table, self.key_field
51 ))
52 .bind(path)
53 .fetch_optional(pool)
54 .await
55 .map_err(parse_postgres_error)?;
56
57 Ok(value.map(Buffer::from))
58 }
59
60 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
61 let pool = self.get_client().await?;
62
63 let table = &self.table;
64 let key_field = &self.key_field;
65 let value_field = &self.value_field;
66 sqlx::query(&format!(
67 r#"INSERT INTO "{table}" ("{key_field}", "{value_field}")
68 VALUES ($1, $2)
69 ON CONFLICT ("{key_field}")
70 DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#,
71 ))
72 .bind(path)
73 .bind(value.to_vec())
74 .execute(pool)
75 .await
76 .map_err(parse_postgres_error)?;
77
78 Ok(())
79 }
80
81 pub async fn delete(&self, path: &str) -> Result<()> {
82 let pool = self.get_client().await?;
83
84 sqlx::query(&format!(
85 "DELETE FROM {} WHERE {} = $1",
86 self.table, self.key_field
87 ))
88 .bind(path)
89 .execute(pool)
90 .await
91 .map_err(parse_postgres_error)?;
92
93 Ok(())
94 }
95}
96
97fn parse_postgres_error(err: sqlx::Error) -> Error {
98 Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
99}