opendal/services/postgresql/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str::FromStr;
21
22use sqlx::postgres::PgConnectOptions;
23use sqlx::PgPool;
24use tokio::sync::OnceCell;
25
26use crate::raw::adapters::kv;
27use crate::raw::*;
28use crate::services::PostgresqlConfig;
29use crate::*;
30
31impl Configurator for PostgresqlConfig {
32    type Builder = PostgresqlBuilder;
33    fn into_builder(self) -> Self::Builder {
34        PostgresqlBuilder { config: self }
35    }
36}
37
38/// [PostgreSQL](https://www.postgresql.org/) services support.
39#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct PostgresqlBuilder {
42    config: PostgresqlConfig,
43}
44
45impl Debug for PostgresqlBuilder {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        let mut d = f.debug_struct("PostgresqlBuilder");
48
49        d.field("config", &self.config);
50        d.finish()
51    }
52}
53
54impl PostgresqlBuilder {
55    /// Set the connection url string of the postgresql service.
56    ///
57    /// The URL should be with a scheme of either `postgres://` or `postgresql://`.
58    ///
59    /// - `postgresql://user@localhost`
60    /// - `postgresql://user:password@%2Fvar%2Flib%2Fpostgresql/mydb?connect_timeout=10`
61    /// - `postgresql://user@host1:1234,host2,host3:5678?target_session_attrs=read-write`
62    /// - `postgresql:///mydb?user=user&host=/var/lib/postgresql`
63    ///
64    /// For more information, please visit <https://docs.rs/sqlx/latest/sqlx/postgres/struct.PgConnectOptions.html>.
65    pub fn connection_string(mut self, v: &str) -> Self {
66        if !v.is_empty() {
67            self.config.connection_string = Some(v.to_string());
68        }
69        self
70    }
71
72    /// Set the working directory, all operations will be performed under it.
73    ///
74    /// default: "/"
75    pub fn root(mut self, root: &str) -> Self {
76        self.config.root = if root.is_empty() {
77            None
78        } else {
79            Some(root.to_string())
80        };
81
82        self
83    }
84
85    /// Set the table name of the postgresql service to read/write.
86    pub fn table(mut self, table: &str) -> Self {
87        if !table.is_empty() {
88            self.config.table = Some(table.to_string());
89        }
90        self
91    }
92
93    /// Set the key field name of the postgresql service to read/write.
94    ///
95    /// Default to `key` if not specified.
96    pub fn key_field(mut self, key_field: &str) -> Self {
97        if !key_field.is_empty() {
98            self.config.key_field = Some(key_field.to_string());
99        }
100        self
101    }
102
103    /// Set the value field name of the postgresql service to read/write.
104    ///
105    /// Default to `value` if not specified.
106    pub fn value_field(mut self, value_field: &str) -> Self {
107        if !value_field.is_empty() {
108            self.config.value_field = Some(value_field.to_string());
109        }
110        self
111    }
112}
113
114impl Builder for PostgresqlBuilder {
115    const SCHEME: Scheme = Scheme::Postgresql;
116    type Config = PostgresqlConfig;
117
118    fn build(self) -> Result<impl Access> {
119        let conn = match self.config.connection_string {
120            Some(v) => v,
121            None => {
122                return Err(
123                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
124                        .with_context("service", Scheme::Postgresql),
125                )
126            }
127        };
128
129        let config = PgConnectOptions::from_str(&conn).map_err(|err| {
130            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
131                .with_context("service", Scheme::Postgresql)
132                .set_source(err)
133        })?;
134
135        let table = match self.config.table {
136            Some(v) => v,
137            None => {
138                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
139                    .with_context("service", Scheme::Postgresql))
140            }
141        };
142
143        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
144
145        let value_field = self
146            .config
147            .value_field
148            .unwrap_or_else(|| "value".to_string());
149
150        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
151
152        Ok(PostgresqlBackend::new(Adapter {
153            pool: OnceCell::new(),
154            config,
155            table,
156            key_field,
157            value_field,
158        })
159        .with_normalized_root(root))
160    }
161}
162
163/// Backend for Postgresql service
164pub type PostgresqlBackend = kv::Backend<Adapter>;
165
166#[derive(Debug, Clone)]
167pub struct Adapter {
168    pool: OnceCell<PgPool>,
169    config: PgConnectOptions,
170
171    table: String,
172    key_field: String,
173    value_field: String,
174}
175
176impl Adapter {
177    async fn get_client(&self) -> Result<&PgPool> {
178        self.pool
179            .get_or_try_init(|| async {
180                let pool = PgPool::connect_with(self.config.clone())
181                    .await
182                    .map_err(parse_postgres_error)?;
183                Ok(pool)
184            })
185            .await
186    }
187}
188
189impl kv::Adapter for Adapter {
190    type Scanner = ();
191
192    fn info(&self) -> kv::Info {
193        kv::Info::new(
194            Scheme::Postgresql,
195            &self.table,
196            Capability {
197                read: true,
198                write: true,
199                shared: true,
200                ..Default::default()
201            },
202        )
203    }
204
205    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
206        let pool = self.get_client().await?;
207
208        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
209            r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#,
210            self.value_field, self.table, self.key_field
211        ))
212        .bind(path)
213        .fetch_optional(pool)
214        .await
215        .map_err(parse_postgres_error)?;
216
217        Ok(value.map(Buffer::from))
218    }
219
220    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
221        let pool = self.get_client().await?;
222
223        let table = &self.table;
224        let key_field = &self.key_field;
225        let value_field = &self.value_field;
226        sqlx::query(&format!(
227            r#"INSERT INTO "{table}" ("{key_field}", "{value_field}")
228                VALUES ($1, $2)
229                ON CONFLICT ("{key_field}")
230                    DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#,
231        ))
232        .bind(path)
233        .bind(value.to_vec())
234        .execute(pool)
235        .await
236        .map_err(parse_postgres_error)?;
237
238        Ok(())
239    }
240
241    async fn delete(&self, path: &str) -> Result<()> {
242        let pool = self.get_client().await?;
243
244        sqlx::query(&format!(
245            "DELETE FROM {} WHERE {} = $1",
246            self.table, self.key_field
247        ))
248        .bind(path)
249        .execute(pool)
250        .await
251        .map_err(parse_postgres_error)?;
252
253        Ok(())
254    }
255}
256
257fn parse_postgres_error(err: sqlx::Error) -> Error {
258    Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
259}