opendal/services/d1/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use super::D1_SCHEME;
22use super::config::D1Config;
23use super::core::*;
24use super::deleter::D1Deleter;
25use super::writer::D1Writer;
26use crate::raw::*;
27use crate::*;
28
29#[doc = include_str!("docs.md")]
30#[derive(Default)]
31pub struct D1Builder {
32    pub(super) config: D1Config,
33
34    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
35    pub(super) http_client: Option<HttpClient>,
36}
37
38impl Debug for D1Builder {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("D1Builder")
41            .field("config", &self.config)
42            .finish_non_exhaustive()
43    }
44}
45
46impl D1Builder {
47    /// Set api token for the cloudflare d1 service.
48    ///
49    /// create a api token from [here](https://dash.cloudflare.com/profile/api-tokens)
50    pub fn token(mut self, token: &str) -> Self {
51        if !token.is_empty() {
52            self.config.token = Some(token.to_string());
53        }
54        self
55    }
56
57    /// Set the account identifier for the cloudflare d1 service.
58    ///
59    /// get the account identifier from Workers & Pages -> Overview -> Account ID
60    /// If not specified, it will return an error when building.
61    pub fn account_id(mut self, account_id: &str) -> Self {
62        if !account_id.is_empty() {
63            self.config.account_id = Some(account_id.to_string());
64        }
65        self
66    }
67
68    /// Set the database identifier for the cloudflare d1 service.
69    ///
70    /// get the database identifier from Workers & Pages -> D1 -> [Your Database] -> Database ID
71    /// If not specified, it will return an error when building.
72    pub fn database_id(mut self, database_id: &str) -> Self {
73        if !database_id.is_empty() {
74            self.config.database_id = Some(database_id.to_string());
75        }
76        self
77    }
78
79    /// set the working directory, all operations will be performed under it.
80    ///
81    /// default: "/"
82    pub fn root(mut self, root: &str) -> Self {
83        self.config.root = if root.is_empty() {
84            None
85        } else {
86            Some(root.to_string())
87        };
88
89        self
90    }
91
92    /// Set the table name of the d1 service to read/write.
93    ///
94    /// If not specified, it will return an error when building.
95    pub fn table(mut self, table: &str) -> Self {
96        if !table.is_empty() {
97            self.config.table = Some(table.to_owned());
98        }
99        self
100    }
101
102    /// Set the key field name of the d1 service to read/write.
103    ///
104    /// Default to `key` if not specified.
105    pub fn key_field(mut self, key_field: &str) -> Self {
106        if !key_field.is_empty() {
107            self.config.key_field = Some(key_field.to_string());
108        }
109        self
110    }
111
112    /// Set the value field name of the d1 service to read/write.
113    ///
114    /// Default to `value` if not specified.
115    pub fn value_field(mut self, value_field: &str) -> Self {
116        if !value_field.is_empty() {
117            self.config.value_field = Some(value_field.to_string());
118        }
119        self
120    }
121}
122
123impl Builder for D1Builder {
124    type Config = D1Config;
125
126    fn build(self) -> Result<impl Access> {
127        let mut authorization = None;
128        let config = self.config;
129
130        if let Some(token) = config.token {
131            authorization = Some(format_authorization_by_bearer(&token)?)
132        }
133
134        let Some(account_id) = config.account_id else {
135            return Err(Error::new(
136                ErrorKind::ConfigInvalid,
137                "account_id is required",
138            ));
139        };
140
141        let Some(database_id) = config.database_id.clone() else {
142            return Err(Error::new(
143                ErrorKind::ConfigInvalid,
144                "database_id is required",
145            ));
146        };
147
148        #[allow(deprecated)]
149        let client = if let Some(client) = self.http_client {
150            client
151        } else {
152            HttpClient::new().map_err(|err| {
153                err.with_operation("Builder::build")
154                    .with_context("service", D1_SCHEME)
155            })?
156        };
157
158        let Some(table) = config.table.clone() else {
159            return Err(Error::new(ErrorKind::ConfigInvalid, "table is required"));
160        };
161
162        let key_field = config
163            .key_field
164            .clone()
165            .unwrap_or_else(|| "key".to_string());
166
167        let value_field = config
168            .value_field
169            .clone()
170            .unwrap_or_else(|| "value".to_string());
171
172        let root = normalize_root(
173            config
174                .root
175                .clone()
176                .unwrap_or_else(|| "/".to_string())
177                .as_str(),
178        );
179        Ok(D1Backend::new(D1Core {
180            authorization,
181            account_id,
182            database_id,
183            client,
184            table,
185            key_field,
186            value_field,
187        })
188        .with_normalized_root(root))
189    }
190}
191
192/// Backend for D1 services.
193#[derive(Clone, Debug)]
194pub struct D1Backend {
195    core: Arc<D1Core>,
196    root: String,
197    info: Arc<AccessorInfo>,
198}
199
200impl D1Backend {
201    pub fn new(core: D1Core) -> Self {
202        let info = AccessorInfo::default();
203        info.set_scheme(D1_SCHEME);
204        info.set_name(&core.table);
205        info.set_root("/");
206        info.set_native_capability(Capability {
207            read: true,
208            stat: true,
209            write: true,
210            write_can_empty: true,
211            // Cloudflare D1 supports 1MB as max in write_total.
212            // refer to https://developers.cloudflare.com/d1/platform/limits/
213            write_total_max_size: Some(1000 * 1000),
214            delete: true,
215            shared: true,
216            ..Default::default()
217        });
218
219        Self {
220            core: Arc::new(core),
221            root: "/".to_string(),
222            info: Arc::new(info),
223        }
224    }
225
226    fn with_normalized_root(mut self, root: String) -> Self {
227        self.info.set_root(&root);
228        self.root = root;
229        self
230    }
231}
232
233impl Access for D1Backend {
234    type Reader = Buffer;
235    type Writer = D1Writer;
236    type Lister = ();
237    type Deleter = oio::OneShotDeleter<D1Deleter>;
238
239    fn info(&self) -> Arc<AccessorInfo> {
240        self.info.clone()
241    }
242
243    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
244        let p = build_abs_path(&self.root, path);
245
246        if p == build_abs_path(&self.root, "") {
247            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
248        } else {
249            let bs = self.core.get(&p).await?;
250            match bs {
251                Some(bs) => Ok(RpStat::new(
252                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
253                )),
254                None => Err(Error::new(ErrorKind::NotFound, "kv not found in d1")),
255            }
256        }
257    }
258
259    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
260        let p = build_abs_path(&self.root, path);
261        let bs = match self.core.get(&p).await? {
262            Some(bs) => bs,
263            None => {
264                return Err(Error::new(ErrorKind::NotFound, "kv not found in d1"));
265            }
266        };
267        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
268    }
269
270    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
271        let p = build_abs_path(&self.root, path);
272        Ok((RpWrite::new(), D1Writer::new(self.core.clone(), p)))
273    }
274
275    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
276        Ok((
277            RpDelete::default(),
278            oio::OneShotDeleter::new(D1Deleter::new(self.core.clone(), self.root.clone())),
279        ))
280    }
281
282    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
283        let _ = build_abs_path(&self.root, path);
284        Ok((RpList::default(), ()))
285    }
286}