opendal/services/d1/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use super::D1_SCHEME;
22use super::config::D1Config;
23use super::core::*;
24use super::deleter::D1Deleter;
25use super::writer::D1Writer;
26use crate::raw::*;
27use crate::*;
28
29#[doc = include_str!("docs.md")]
30#[derive(Default)]
31pub struct D1Builder {
32 pub(super) config: D1Config,
33
34 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
35 pub(super) http_client: Option<HttpClient>,
36}
37
38impl Debug for D1Builder {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("D1Builder")
41 .field("config", &self.config)
42 .finish_non_exhaustive()
43 }
44}
45
46impl D1Builder {
47 pub fn token(mut self, token: &str) -> Self {
51 if !token.is_empty() {
52 self.config.token = Some(token.to_string());
53 }
54 self
55 }
56
57 pub fn account_id(mut self, account_id: &str) -> Self {
62 if !account_id.is_empty() {
63 self.config.account_id = Some(account_id.to_string());
64 }
65 self
66 }
67
68 pub fn database_id(mut self, database_id: &str) -> Self {
73 if !database_id.is_empty() {
74 self.config.database_id = Some(database_id.to_string());
75 }
76 self
77 }
78
79 pub fn root(mut self, root: &str) -> Self {
83 self.config.root = if root.is_empty() {
84 None
85 } else {
86 Some(root.to_string())
87 };
88
89 self
90 }
91
92 pub fn table(mut self, table: &str) -> Self {
96 if !table.is_empty() {
97 self.config.table = Some(table.to_owned());
98 }
99 self
100 }
101
102 pub fn key_field(mut self, key_field: &str) -> Self {
106 if !key_field.is_empty() {
107 self.config.key_field = Some(key_field.to_string());
108 }
109 self
110 }
111
112 pub fn value_field(mut self, value_field: &str) -> Self {
116 if !value_field.is_empty() {
117 self.config.value_field = Some(value_field.to_string());
118 }
119 self
120 }
121}
122
123impl Builder for D1Builder {
124 type Config = D1Config;
125
126 fn build(self) -> Result<impl Access> {
127 let mut authorization = None;
128 let config = self.config;
129
130 if let Some(token) = config.token {
131 authorization = Some(format_authorization_by_bearer(&token)?)
132 }
133
134 let Some(account_id) = config.account_id else {
135 return Err(Error::new(
136 ErrorKind::ConfigInvalid,
137 "account_id is required",
138 ));
139 };
140
141 let Some(database_id) = config.database_id.clone() else {
142 return Err(Error::new(
143 ErrorKind::ConfigInvalid,
144 "database_id is required",
145 ));
146 };
147
148 #[allow(deprecated)]
149 let client = if let Some(client) = self.http_client {
150 client
151 } else {
152 HttpClient::new().map_err(|err| {
153 err.with_operation("Builder::build")
154 .with_context("service", D1_SCHEME)
155 })?
156 };
157
158 let Some(table) = config.table.clone() else {
159 return Err(Error::new(ErrorKind::ConfigInvalid, "table is required"));
160 };
161
162 let key_field = config
163 .key_field
164 .clone()
165 .unwrap_or_else(|| "key".to_string());
166
167 let value_field = config
168 .value_field
169 .clone()
170 .unwrap_or_else(|| "value".to_string());
171
172 let root = normalize_root(
173 config
174 .root
175 .clone()
176 .unwrap_or_else(|| "/".to_string())
177 .as_str(),
178 );
179 Ok(D1Backend::new(D1Core {
180 authorization,
181 account_id,
182 database_id,
183 client,
184 table,
185 key_field,
186 value_field,
187 })
188 .with_normalized_root(root))
189 }
190}
191
192#[derive(Clone, Debug)]
194pub struct D1Backend {
195 core: Arc<D1Core>,
196 root: String,
197 info: Arc<AccessorInfo>,
198}
199
200impl D1Backend {
201 pub fn new(core: D1Core) -> Self {
202 let info = AccessorInfo::default();
203 info.set_scheme(D1_SCHEME);
204 info.set_name(&core.table);
205 info.set_root("/");
206 info.set_native_capability(Capability {
207 read: true,
208 stat: true,
209 write: true,
210 write_can_empty: true,
211 write_total_max_size: Some(1000 * 1000),
214 delete: true,
215 shared: true,
216 ..Default::default()
217 });
218
219 Self {
220 core: Arc::new(core),
221 root: "/".to_string(),
222 info: Arc::new(info),
223 }
224 }
225
226 fn with_normalized_root(mut self, root: String) -> Self {
227 self.info.set_root(&root);
228 self.root = root;
229 self
230 }
231}
232
233impl Access for D1Backend {
234 type Reader = Buffer;
235 type Writer = D1Writer;
236 type Lister = ();
237 type Deleter = oio::OneShotDeleter<D1Deleter>;
238
239 fn info(&self) -> Arc<AccessorInfo> {
240 self.info.clone()
241 }
242
243 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
244 let p = build_abs_path(&self.root, path);
245
246 if p == build_abs_path(&self.root, "") {
247 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
248 } else {
249 let bs = self.core.get(&p).await?;
250 match bs {
251 Some(bs) => Ok(RpStat::new(
252 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
253 )),
254 None => Err(Error::new(ErrorKind::NotFound, "kv not found in d1")),
255 }
256 }
257 }
258
259 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
260 let p = build_abs_path(&self.root, path);
261 let bs = match self.core.get(&p).await? {
262 Some(bs) => bs,
263 None => {
264 return Err(Error::new(ErrorKind::NotFound, "kv not found in d1"));
265 }
266 };
267 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
268 }
269
270 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
271 let p = build_abs_path(&self.root, path);
272 Ok((RpWrite::new(), D1Writer::new(self.core.clone(), p)))
273 }
274
275 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
276 Ok((
277 RpDelete::default(),
278 oio::OneShotDeleter::new(D1Deleter::new(self.core.clone(), self.root.clone())),
279 ))
280 }
281
282 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
283 let _ = build_abs_path(&self.root, path);
284 Ok((RpList::default(), ()))
285 }
286}