opendal_core/services/postgresql/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use mea::once::OnceCell;
19use sqlx::PgPool;
20use sqlx::postgres::PgConnectOptions;
21
22use crate::*;
23
24#[derive(Clone, Debug)]
25pub struct PostgresqlCore {
26    pub pool: OnceCell<PgPool>,
27    pub config: PgConnectOptions,
28
29    pub table: String,
30    pub key_field: String,
31    pub value_field: String,
32}
33
34impl PostgresqlCore {
35    async fn get_client(&self) -> Result<&PgPool> {
36        self.pool
37            .get_or_try_init(|| async {
38                PgPool::connect_with(self.config.clone())
39                    .await
40                    .map_err(parse_postgres_error)
41            })
42            .await
43    }
44
45    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
46        let pool = self.get_client().await?;
47
48        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
49            r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#,
50            self.value_field, self.table, self.key_field
51        ))
52        .bind(path)
53        .fetch_optional(pool)
54        .await
55        .map_err(parse_postgres_error)?;
56
57        Ok(value.map(Buffer::from))
58    }
59
60    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
61        let pool = self.get_client().await?;
62
63        let table = &self.table;
64        let key_field = &self.key_field;
65        let value_field = &self.value_field;
66        sqlx::query(&format!(
67            r#"INSERT INTO "{table}" ("{key_field}", "{value_field}")
68                VALUES ($1, $2)
69                ON CONFLICT ("{key_field}")
70                    DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#,
71        ))
72        .bind(path)
73        .bind(value.to_vec())
74        .execute(pool)
75        .await
76        .map_err(parse_postgres_error)?;
77
78        Ok(())
79    }
80
81    pub async fn delete(&self, path: &str) -> Result<()> {
82        let pool = self.get_client().await?;
83
84        sqlx::query(&format!(
85            "DELETE FROM {} WHERE {} = $1",
86            self.table, self.key_field
87        ))
88        .bind(path)
89        .execute(pool)
90        .await
91        .map_err(parse_postgres_error)?;
92
93        Ok(())
94    }
95}
96
97fn parse_postgres_error(err: sqlx::Error) -> Error {
98    Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
99}