opendal_core/services/d1/
core.rs1use std::fmt::Debug;
19
20use http::Request;
21use http::StatusCode;
22use http::header;
23use serde_json::Value;
24
25use super::error::parse_error;
26use super::model::*;
27use crate::raw::*;
28use crate::*;
29
30#[derive(Clone)]
31pub struct D1Core {
32 pub authorization: Option<String>,
33 pub account_id: String,
34 pub database_id: String,
35
36 pub client: HttpClient,
37 pub table: String,
38 pub key_field: String,
39 pub value_field: String,
40}
41
42impl Debug for D1Core {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("D1Core")
45 .field("table", &self.table)
46 .field("key_field", &self.key_field)
47 .field("value_field", &self.value_field)
48 .finish_non_exhaustive()
49 }
50}
51
52impl D1Core {
53 fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> Result<Request<Buffer>> {
54 let p = format!(
55 "/accounts/{}/d1/database/{}/query",
56 self.account_id, self.database_id
57 );
58 let url: String = format!(
59 "{}{}",
60 "https://api.cloudflare.com/client/v4",
61 percent_encode_path(&p)
62 );
63
64 let mut req = Request::post(&url);
65 if let Some(auth) = &self.authorization {
66 req = req.header(header::AUTHORIZATION, auth);
67 }
68 req = req.header(header::CONTENT_TYPE, "application/json");
69
70 let json = serde_json::json!({
71 "sql": sql,
72 "params": params,
73 });
74
75 let body = serde_json::to_vec(&json).map_err(new_json_serialize_error)?;
76 req.body(Buffer::from(body))
77 .map_err(new_request_build_error)
78 }
79
80 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
81 let query = format!(
82 "SELECT {} FROM {} WHERE {} = ? LIMIT 1",
83 self.value_field, self.table, self.key_field
84 );
85 let req = self.create_d1_query_request(&query, vec![path.into()])?;
86
87 let resp = self.client.send(req).await?;
88 let status = resp.status();
89 match status {
90 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
91 let body = resp.into_body();
92 let bs = body.to_bytes();
93 let d1_response = D1Response::parse(&bs)?;
94 Ok(d1_response.get_result(&self.value_field))
95 }
96 _ => Err(parse_error(resp)),
97 }
98 }
99
100 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
101 let table = &self.table;
102 let key_field = &self.key_field;
103 let value_field = &self.value_field;
104 let query = format!(
105 "INSERT INTO {table} ({key_field}, {value_field}) \
106 VALUES (?, ?) \
107 ON CONFLICT ({key_field}) \
108 DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
109 );
110
111 let params = vec![path.into(), value.to_vec().into()];
112 let req = self.create_d1_query_request(&query, params)?;
113
114 let resp = self.client.send(req).await?;
115 let status = resp.status();
116 match status {
117 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
118 _ => Err(parse_error(resp)),
119 }
120 }
121
122 pub async fn delete(&self, path: &str) -> Result<()> {
123 let query = format!("DELETE FROM {} WHERE {} = ?", self.table, self.key_field);
124 let req = self.create_d1_query_request(&query, vec![path.into()])?;
125
126 let resp = self.client.send(req).await?;
127 let status = resp.status();
128 match status {
129 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
130 _ => Err(parse_error(resp)),
131 }
132 }
133}