opendal/services/postgresql/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::str::FromStr;
21
22use sqlx::postgres::PgConnectOptions;
23use sqlx::PgPool;
24use tokio::sync::OnceCell;
25
26use crate::raw::adapters::kv;
27use crate::raw::*;
28use crate::services::PostgresqlConfig;
29use crate::*;
30
31impl Configurator for PostgresqlConfig {
32    type Builder = PostgresqlBuilder;
33    fn into_builder(self) -> Self::Builder {
34        PostgresqlBuilder { config: self }
35    }
36}
37
38/// [PostgreSQL](https://www.postgresql.org/) services support.
39#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct PostgresqlBuilder {
42    config: PostgresqlConfig,
43}
44
45impl Debug for PostgresqlBuilder {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        let mut d = f.debug_struct("PostgresqlBuilder");
48
49        d.field("config", &self.config);
50        d.finish()
51    }
52}
53
54impl PostgresqlBuilder {
55    /// Set the connection url string of the postgresql service.
56    ///
57    /// The URL should be with a scheme of either `postgres://` or `postgresql://`.
58    ///
59    /// - `postgresql://user@localhost`
60    /// - `postgresql://user:password@%2Fvar%2Flib%2Fpostgresql/mydb?connect_timeout=10`
61    /// - `postgresql://user@host1:1234,host2,host3:5678?target_session_attrs=read-write`
62    /// - `postgresql:///mydb?user=user&host=/var/lib/postgresql`
63    ///
64    /// For more information, please visit <https://docs.rs/sqlx/latest/sqlx/postgres/struct.PgConnectOptions.html>.
65    pub fn connection_string(mut self, v: &str) -> Self {
66        if !v.is_empty() {
67            self.config.connection_string = Some(v.to_string());
68        }
69        self
70    }
71
72    /// Set the working directory, all operations will be performed under it.
73    ///
74    /// default: "/"
75    pub fn root(mut self, root: &str) -> Self {
76        self.config.root = if root.is_empty() {
77            None
78        } else {
79            Some(root.to_string())
80        };
81
82        self
83    }
84
85    /// Set the table name of the postgresql service to read/write.
86    pub fn table(mut self, table: &str) -> Self {
87        if !table.is_empty() {
88            self.config.table = Some(table.to_string());
89        }
90        self
91    }
92
93    /// Set the key field name of the postgresql service to read/write.
94    ///
95    /// Default to `key` if not specified.
96    pub fn key_field(mut self, key_field: &str) -> Self {
97        if !key_field.is_empty() {
98            self.config.key_field = Some(key_field.to_string());
99        }
100        self
101    }
102
103    /// Set the value field name of the postgresql service to read/write.
104    ///
105    /// Default to `value` if not specified.
106    pub fn value_field(mut self, value_field: &str) -> Self {
107        if !value_field.is_empty() {
108            self.config.value_field = Some(value_field.to_string());
109        }
110        self
111    }
112}
113
114impl Builder for PostgresqlBuilder {
115    type Config = PostgresqlConfig;
116
117    fn build(self) -> Result<impl Access> {
118        let conn = match self.config.connection_string {
119            Some(v) => v,
120            None => {
121                return Err(
122                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
123                        .with_context("service", Scheme::Postgresql),
124                )
125            }
126        };
127
128        let config = PgConnectOptions::from_str(&conn).map_err(|err| {
129            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
130                .with_context("service", Scheme::Postgresql)
131                .set_source(err)
132        })?;
133
134        let table = match self.config.table {
135            Some(v) => v,
136            None => {
137                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
138                    .with_context("service", Scheme::Postgresql))
139            }
140        };
141
142        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
143
144        let value_field = self
145            .config
146            .value_field
147            .unwrap_or_else(|| "value".to_string());
148
149        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
150
151        Ok(PostgresqlBackend::new(Adapter {
152            pool: OnceCell::new(),
153            config,
154            table,
155            key_field,
156            value_field,
157        })
158        .with_normalized_root(root))
159    }
160}
161
162/// Backend for Postgresql service
163pub type PostgresqlBackend = kv::Backend<Adapter>;
164
165#[derive(Debug, Clone)]
166pub struct Adapter {
167    pool: OnceCell<PgPool>,
168    config: PgConnectOptions,
169
170    table: String,
171    key_field: String,
172    value_field: String,
173}
174
175impl Adapter {
176    async fn get_client(&self) -> Result<&PgPool> {
177        self.pool
178            .get_or_try_init(|| async {
179                let pool = PgPool::connect_with(self.config.clone())
180                    .await
181                    .map_err(parse_postgres_error)?;
182                Ok(pool)
183            })
184            .await
185    }
186}
187
188impl kv::Adapter for Adapter {
189    type Scanner = ();
190
191    fn info(&self) -> kv::Info {
192        kv::Info::new(
193            Scheme::Postgresql,
194            &self.table,
195            Capability {
196                read: true,
197                write: true,
198                shared: true,
199                ..Default::default()
200            },
201        )
202    }
203
204    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
205        let pool = self.get_client().await?;
206
207        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
208            r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#,
209            self.value_field, self.table, self.key_field
210        ))
211        .bind(path)
212        .fetch_optional(pool)
213        .await
214        .map_err(parse_postgres_error)?;
215
216        Ok(value.map(Buffer::from))
217    }
218
219    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
220        let pool = self.get_client().await?;
221
222        let table = &self.table;
223        let key_field = &self.key_field;
224        let value_field = &self.value_field;
225        sqlx::query(&format!(
226            r#"INSERT INTO "{table}" ("{key_field}", "{value_field}")
227                VALUES ($1, $2)
228                ON CONFLICT ("{key_field}")
229                    DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#,
230        ))
231        .bind(path)
232        .bind(value.to_vec())
233        .execute(pool)
234        .await
235        .map_err(parse_postgres_error)?;
236
237        Ok(())
238    }
239
240    async fn delete(&self, path: &str) -> Result<()> {
241        let pool = self.get_client().await?;
242
243        sqlx::query(&format!(
244            "DELETE FROM {} WHERE {} = $1",
245            self.table, self.key_field
246        ))
247        .bind(path)
248        .execute(pool)
249        .await
250        .map_err(parse_postgres_error)?;
251
252        Ok(())
253    }
254}
255
256fn parse_postgres_error(err: sqlx::Error) -> Error {
257    Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
258}