opendal/services/d1/
backend.rs1use std::sync::Arc;
19
20use super::config::D1Config;
21use super::core::*;
22use super::deleter::D1Deleter;
23use super::writer::D1Writer;
24use crate::raw::*;
25use crate::*;
26
27#[doc = include_str!("docs.md")]
28#[derive(Default)]
29pub struct D1Builder {
30 pub(super) config: D1Config,
31
32 pub(super) http_client: Option<HttpClient>,
33}
34
35impl D1Builder {
36 pub fn token(mut self, token: &str) -> Self {
40 if !token.is_empty() {
41 self.config.token = Some(token.to_string());
42 }
43 self
44 }
45
46 pub fn account_id(mut self, account_id: &str) -> Self {
51 if !account_id.is_empty() {
52 self.config.account_id = Some(account_id.to_string());
53 }
54 self
55 }
56
57 pub fn database_id(mut self, database_id: &str) -> Self {
62 if !database_id.is_empty() {
63 self.config.database_id = Some(database_id.to_string());
64 }
65 self
66 }
67
68 pub fn root(mut self, root: &str) -> Self {
72 self.config.root = if root.is_empty() {
73 None
74 } else {
75 Some(root.to_string())
76 };
77
78 self
79 }
80
81 pub fn table(mut self, table: &str) -> Self {
85 if !table.is_empty() {
86 self.config.table = Some(table.to_owned());
87 }
88 self
89 }
90
91 pub fn key_field(mut self, key_field: &str) -> Self {
95 if !key_field.is_empty() {
96 self.config.key_field = Some(key_field.to_string());
97 }
98 self
99 }
100
101 pub fn value_field(mut self, value_field: &str) -> Self {
105 if !value_field.is_empty() {
106 self.config.value_field = Some(value_field.to_string());
107 }
108 self
109 }
110}
111
112impl Builder for D1Builder {
113 type Config = D1Config;
114
115 fn build(self) -> Result<impl Access> {
116 let mut authorization = None;
117 let config = self.config;
118
119 if let Some(token) = config.token {
120 authorization = Some(format_authorization_by_bearer(&token)?)
121 }
122
123 let Some(account_id) = config.account_id else {
124 return Err(Error::new(
125 ErrorKind::ConfigInvalid,
126 "account_id is required",
127 ));
128 };
129
130 let Some(database_id) = config.database_id.clone() else {
131 return Err(Error::new(
132 ErrorKind::ConfigInvalid,
133 "database_id is required",
134 ));
135 };
136
137 let client = if let Some(client) = self.http_client {
138 client
139 } else {
140 HttpClient::new().map_err(|err| {
141 err.with_operation("Builder::build")
142 .with_context("service", Scheme::D1)
143 })?
144 };
145
146 let Some(table) = config.table.clone() else {
147 return Err(Error::new(ErrorKind::ConfigInvalid, "table is required"));
148 };
149
150 let key_field = config
151 .key_field
152 .clone()
153 .unwrap_or_else(|| "key".to_string());
154
155 let value_field = config
156 .value_field
157 .clone()
158 .unwrap_or_else(|| "value".to_string());
159
160 let root = normalize_root(
161 config
162 .root
163 .clone()
164 .unwrap_or_else(|| "/".to_string())
165 .as_str(),
166 );
167 Ok(D1Backend::new(D1Core {
168 authorization,
169 account_id,
170 database_id,
171 client,
172 table,
173 key_field,
174 value_field,
175 })
176 .with_normalized_root(root))
177 }
178}
179
180#[derive(Clone, Debug)]
182pub struct D1Backend {
183 core: Arc<D1Core>,
184 root: String,
185 info: Arc<AccessorInfo>,
186}
187
188impl D1Backend {
189 pub fn new(core: D1Core) -> Self {
190 let info = AccessorInfo::default();
191 info.set_scheme(Scheme::D1.into_static());
192 info.set_name(&core.table);
193 info.set_root("/");
194 info.set_native_capability(Capability {
195 read: true,
196 stat: true,
197 write: true,
198 write_can_empty: true,
199 write_total_max_size: Some(1000 * 1000),
202 delete: true,
203 shared: true,
204 ..Default::default()
205 });
206
207 Self {
208 core: Arc::new(core),
209 root: "/".to_string(),
210 info: Arc::new(info),
211 }
212 }
213
214 fn with_normalized_root(mut self, root: String) -> Self {
215 self.info.set_root(&root);
216 self.root = root;
217 self
218 }
219}
220
221impl Access for D1Backend {
222 type Reader = Buffer;
223 type Writer = D1Writer;
224 type Lister = ();
225 type Deleter = oio::OneShotDeleter<D1Deleter>;
226
227 fn info(&self) -> Arc<AccessorInfo> {
228 self.info.clone()
229 }
230
231 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
232 let p = build_abs_path(&self.root, path);
233
234 if p == build_abs_path(&self.root, "") {
235 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
236 } else {
237 let bs = self.core.get(&p).await?;
238 match bs {
239 Some(bs) => Ok(RpStat::new(
240 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
241 )),
242 None => Err(Error::new(ErrorKind::NotFound, "kv not found in d1")),
243 }
244 }
245 }
246
247 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
248 let p = build_abs_path(&self.root, path);
249 let bs = match self.core.get(&p).await? {
250 Some(bs) => bs,
251 None => {
252 return Err(Error::new(ErrorKind::NotFound, "kv not found in d1"));
253 }
254 };
255 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
256 }
257
258 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
259 let p = build_abs_path(&self.root, path);
260 Ok((RpWrite::new(), D1Writer::new(self.core.clone(), p)))
261 }
262
263 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
264 Ok((
265 RpDelete::default(),
266 oio::OneShotDeleter::new(D1Deleter::new(self.core.clone(), self.root.clone())),
267 ))
268 }
269
270 async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
271 let _ = build_abs_path(&self.root, path);
272 Ok((RpList::default(), ()))
273 }
274}