opendal/services/postgresql/
backend.rs1use std::sync::Arc;
19
20use sqlx::postgres::PgConnectOptions;
21use tokio::sync::OnceCell;
22
23use super::POSTGRESQL_SCHEME;
24use super::config::PostgresqlConfig;
25use super::core::*;
26use super::deleter::PostgresqlDeleter;
27use super::writer::PostgresqlWriter;
28use crate::raw::*;
29use crate::*;
30
31#[doc = include_str!("docs.md")]
33#[derive(Debug, Default)]
34pub struct PostgresqlBuilder {
35 pub(super) config: PostgresqlConfig,
36}
37
38impl PostgresqlBuilder {
39 pub fn connection_string(mut self, v: &str) -> Self {
50 if !v.is_empty() {
51 self.config.connection_string = Some(v.to_string());
52 }
53 self
54 }
55
56 pub fn root(mut self, root: &str) -> Self {
60 self.config.root = if root.is_empty() {
61 None
62 } else {
63 Some(root.to_string())
64 };
65
66 self
67 }
68
69 pub fn table(mut self, table: &str) -> Self {
71 if !table.is_empty() {
72 self.config.table = Some(table.to_string());
73 }
74 self
75 }
76
77 pub fn key_field(mut self, key_field: &str) -> Self {
81 if !key_field.is_empty() {
82 self.config.key_field = Some(key_field.to_string());
83 }
84 self
85 }
86
87 pub fn value_field(mut self, value_field: &str) -> Self {
91 if !value_field.is_empty() {
92 self.config.value_field = Some(value_field.to_string());
93 }
94 self
95 }
96}
97
98impl Builder for PostgresqlBuilder {
99 type Config = PostgresqlConfig;
100
101 fn build(self) -> Result<impl Access> {
102 let conn = match self.config.connection_string {
103 Some(v) => v,
104 None => {
105 return Err(
106 Error::new(ErrorKind::ConfigInvalid, "connection_string is empty")
107 .with_context("service", POSTGRESQL_SCHEME),
108 );
109 }
110 };
111
112 let config = conn.parse::<PgConnectOptions>().map_err(|err| {
113 Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid")
114 .with_context("service", POSTGRESQL_SCHEME)
115 .set_source(err)
116 })?;
117
118 let table = match self.config.table {
119 Some(v) => v,
120 None => {
121 return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty")
122 .with_context("service", POSTGRESQL_SCHEME));
123 }
124 };
125
126 let key_field = self.config.key_field.unwrap_or_else(|| "key".to_string());
127
128 let value_field = self
129 .config
130 .value_field
131 .unwrap_or_else(|| "value".to_string());
132
133 let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str());
134
135 Ok(PostgresqlBackend::new(PostgresqlCore {
136 pool: OnceCell::new(),
137 config,
138 table,
139 key_field,
140 value_field,
141 })
142 .with_normalized_root(root))
143 }
144}
145
146#[derive(Clone, Debug)]
148pub struct PostgresqlBackend {
149 core: Arc<PostgresqlCore>,
150 root: String,
151 info: Arc<AccessorInfo>,
152}
153
154impl PostgresqlBackend {
155 pub fn new(core: PostgresqlCore) -> Self {
156 let info = AccessorInfo::default();
157 info.set_scheme(POSTGRESQL_SCHEME);
158 info.set_name(&core.table);
159 info.set_root("/");
160 info.set_native_capability(Capability {
161 read: true,
162 stat: true,
163 write: true,
164 write_can_empty: true,
165 delete: true,
166 shared: true,
167 ..Default::default()
168 });
169
170 Self {
171 core: Arc::new(core),
172 root: "/".to_string(),
173 info: Arc::new(info),
174 }
175 }
176
177 fn with_normalized_root(mut self, root: String) -> Self {
178 self.info.set_root(&root);
179 self.root = root;
180 self
181 }
182}
183
184impl Access for PostgresqlBackend {
185 type Reader = Buffer;
186 type Writer = PostgresqlWriter;
187 type Lister = ();
188 type Deleter = oio::OneShotDeleter<PostgresqlDeleter>;
189
190 fn info(&self) -> Arc<AccessorInfo> {
191 self.info.clone()
192 }
193
194 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
195 let p = build_abs_path(&self.root, path);
196
197 if p == build_abs_path(&self.root, "") {
198 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
199 } else {
200 let bs = self.core.get(&p).await?;
201 match bs {
202 Some(bs) => Ok(RpStat::new(
203 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
204 )),
205 None => Err(Error::new(
206 ErrorKind::NotFound,
207 "kv not found in postgresql",
208 )),
209 }
210 }
211 }
212
213 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
214 let p = build_abs_path(&self.root, path);
215 let bs = match self.core.get(&p).await? {
216 Some(bs) => bs,
217 None => {
218 return Err(Error::new(
219 ErrorKind::NotFound,
220 "kv not found in postgresql",
221 ));
222 }
223 };
224 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
225 }
226
227 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
228 let p = build_abs_path(&self.root, path);
229 Ok((RpWrite::new(), PostgresqlWriter::new(self.core.clone(), p)))
230 }
231
232 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
233 Ok((
234 RpDelete::default(),
235 oio::OneShotDeleter::new(PostgresqlDeleter::new(self.core.clone(), self.root.clone())),
236 ))
237 }
238}