opendal/services/postgresql/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use sqlx::postgres::PgConnectOptions;
21use tokio::sync::OnceCell;
22
23use super::POSTGRESQL_SCHEME;
24use super::config::PostgresqlConfig;
25use super::core::*;
26use super::deleter::PostgresqlDeleter;
27use super::writer::PostgresqlWriter;
28use crate::raw::*;
29use crate::*;
30
31/// [PostgreSQL](https://www.postgresql.org/) services support.
32#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct PostgresqlBuilder {
35    pub(super) config: PostgresqlConfig,
36}
37
38impl PostgresqlBuilder {
39    /// Set the connection url string of the postgresql service.
40    ///
41    /// The URL should be with a scheme of either `postgres://` or `postgresql://`.
42    ///
43    /// - `postgresql://user@localhost`
44    /// - `postgresql://user:password@%2Fvar%2Flib%2Fpostgresql/mydb?connect_timeout=10`
45    /// - `postgresql://user@host1:1234,host2,host3:5678?target_session_attrs=read-write`
46    /// - `postgresql:///mydb?user=user&host=/var/lib/postgresql`
47    ///
48    /// For more information, please visit <https://docs.rs/sqlx/latest/sqlx/postgres/struct.PgConnectOptions.html>.
49    pub fn connection_string(mut self, v: &str) -> Self {
50        if !v.is_empty() {
51            self.config.connection_string = Some(v.to_string());
52        }
53        self
54    }
55
56    /// Set the working directory, all operations will be performed under it.
57    ///
58    /// default: "/"
59    pub fn root(mut self, root: &str) -> Self {
60        self.config.root = if root.is_empty() {
61            None
62        } else {
63            Some(root.to_string())
64        };
65
66        self
67    }
68
69    /// Set the table name of the postgresql service to read/write.
70    pub fn table(mut self, table: &str) -> Self {
71        if !table.is_empty() {
72            self.config.table = Some(table.to_string());
73        }
74        self
75    }
76
77    /// Set the key field name of the postgresql service to read/write.
78    ///
79    /// Default to `key` if not specified.
80    pub fn key_field(mut self, key_field: &str) -> Self {
81        if !key_field.is_empty() {
82            self.config.key_field = Some(key_field.to_string());
83        }
84        self
85    }
86
87    /// Set the value field name of the postgresql service to read/write.
88    ///
89    /// Default to `value` if not specified.
90    pub fn value_field(mut self, value_field: &str) -> Self {
91        if !value_field.is_empty() {
92            self.config.value_field = Some(value_field.to_string());
93        }
94        self
95    }
96}
97
98impl Builder for PostgresqlBuilder {
99    type Config = PostgresqlConfig;
100
101    fn build(self) -> Result<impl Access> {
102        let conn = match self.config.connection_string {
103            Some(v) => v,
104            None => {
105                return Err(
106                    Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
107                        .with_context("service", POSTGRESQL_SCHEME),
108                );
109            }
110        };
111
112        let config = conn.parse::<PgConnectOptions>().map_err(|err| {
113            Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
114                .with_context("service", POSTGRESQL_SCHEME)
115                .set_source(err)
116        })?;
117
118        let table = match self.config.table {
119            Some(v) => v,
120            None => {
121                return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
122                    .with_context("service", POSTGRESQL_SCHEME));
123            }
124        };
125
126        let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
127
128        let value_field = self
129            .config
130            .value_field
131            .unwrap_or_else(|| "value".to_string());
132
133        let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
134
135        Ok(PostgresqlBackend::new(PostgresqlCore {
136            pool: OnceCell::new(),
137            config,
138            table,
139            key_field,
140            value_field,
141        })
142        .with_normalized_root(root))
143    }
144}
145
146/// Backend for Postgresql service
147#[derive(Clone, Debug)]
148pub struct PostgresqlBackend {
149    core: Arc<PostgresqlCore>,
150    root: String,
151    info: Arc<AccessorInfo>,
152}
153
154impl PostgresqlBackend {
155    pub fn new(core: PostgresqlCore) -> Self {
156        let info = AccessorInfo::default();
157        info.set_scheme(POSTGRESQL_SCHEME);
158        info.set_name(&core.table);
159        info.set_root("/");
160        info.set_native_capability(Capability {
161            read: true,
162            stat: true,
163            write: true,
164            write_can_empty: true,
165            delete: true,
166            shared: true,
167            ..Default::default()
168        });
169
170        Self {
171            core: Arc::new(core),
172            root: "/".to_string(),
173            info: Arc::new(info),
174        }
175    }
176
177    fn with_normalized_root(mut self, root: String) -> Self {
178        self.info.set_root(&root);
179        self.root = root;
180        self
181    }
182}
183
184impl Access for PostgresqlBackend {
185    type Reader = Buffer;
186    type Writer = PostgresqlWriter;
187    type Lister = ();
188    type Deleter = oio::OneShotDeleter<PostgresqlDeleter>;
189
190    fn info(&self) -> Arc<AccessorInfo> {
191        self.info.clone()
192    }
193
194    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
195        let p = build_abs_path(&self.root, path);
196
197        if p == build_abs_path(&self.root, "") {
198            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
199        } else {
200            let bs = self.core.get(&p).await?;
201            match bs {
202                Some(bs) => Ok(RpStat::new(
203                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
204                )),
205                None => Err(Error::new(
206                    ErrorKind::NotFound,
207                    "kv not found in postgresql",
208                )),
209            }
210        }
211    }
212
213    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
214        let p = build_abs_path(&self.root, path);
215        let bs = match self.core.get(&p).await? {
216            Some(bs) => bs,
217            None => {
218                return Err(Error::new(
219                    ErrorKind::NotFound,
220                    "kv not found in postgresql",
221                ));
222            }
223        };
224        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
225    }
226
227    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
228        let p = build_abs_path(&self.root, path);
229        Ok((RpWrite::new(), PostgresqlWriter::new(self.core.clone(), p)))
230    }
231
232    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
233        Ok((
234            RpDelete::default(),
235            oio::OneShotDeleter::new(PostgresqlDeleter::new(self.core.clone(), self.root.clone())),
236        ))
237    }
238}