opendal_core/services/d1/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use http::Request;
21use http::StatusCode;
22use http::header;
23use serde_json::Value;
24
25use super::error::parse_error;
26use super::model::*;
27use crate::raw::*;
28use crate::*;
29
30#[derive(Clone)]
31pub struct D1Core {
32    pub authorization: Option<String>,
33    pub account_id: String,
34    pub database_id: String,
35
36    pub client: HttpClient,
37    pub table: String,
38    pub key_field: String,
39    pub value_field: String,
40}
41
42impl Debug for D1Core {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("D1Core")
45            .field("table", &self.table)
46            .field("key_field", &self.key_field)
47            .field("value_field", &self.value_field)
48            .finish_non_exhaustive()
49    }
50}
51
52impl D1Core {
53    fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> Result<Request<Buffer>> {
54        let p = format!(
55            "/accounts/{}/d1/database/{}/query",
56            self.account_id, self.database_id
57        );
58        let url: String = format!(
59            "{}{}",
60            "https://api.cloudflare.com/client/v4",
61            percent_encode_path(&p)
62        );
63
64        let mut req = Request::post(&url);
65        if let Some(auth) = &self.authorization {
66            req = req.header(header::AUTHORIZATION, auth);
67        }
68        req = req.header(header::CONTENT_TYPE, "application/json");
69
70        let json = serde_json::json!({
71            "sql": sql,
72            "params": params,
73        });
74
75        let body = serde_json::to_vec(&json).map_err(new_json_serialize_error)?;
76        req.body(Buffer::from(body))
77            .map_err(new_request_build_error)
78    }
79
80    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
81        let query = format!(
82            "SELECT {} FROM {} WHERE {} = ? LIMIT 1",
83            self.value_field, self.table, self.key_field
84        );
85        let req = self.create_d1_query_request(&query, vec![path.into()])?;
86
87        let resp = self.client.send(req).await?;
88        let status = resp.status();
89        match status {
90            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
91                let body = resp.into_body();
92                let bs = body.to_bytes();
93                let d1_response = D1Response::parse(&bs)?;
94                Ok(d1_response.get_result(&self.value_field))
95            }
96            _ => Err(parse_error(resp)),
97        }
98    }
99
100    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
101        let table = &self.table;
102        let key_field = &self.key_field;
103        let value_field = &self.value_field;
104        let query = format!(
105            "INSERT INTO {table} ({key_field}, {value_field}) \
106                VALUES (?, ?) \
107                ON CONFLICT ({key_field}) \
108                    DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
109        );
110
111        let params = vec![path.into(), value.to_vec().into()];
112        let req = self.create_d1_query_request(&query, params)?;
113
114        let resp = self.client.send(req).await?;
115        let status = resp.status();
116        match status {
117            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
118            _ => Err(parse_error(resp)),
119        }
120    }
121
122    pub async fn delete(&self, path: &str) -> Result<()> {
123        let query = format!("DELETE FROM {} WHERE {} = ?", self.table, self.key_field);
124        let req = self.create_d1_query_request(&query, vec![path.into()])?;
125
126        let resp = self.client.send(req).await?;
127        let status = resp.status();
128        match status {
129            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
130            _ => Err(parse_error(resp)),
131        }
132    }
133}