opendal/services/cloudflare_kv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use bytes::Buf;
22use http::header;
23use http::Request;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::error::parse_error;
28use crate::raw::adapters::kv;
29use crate::raw::*;
30use crate::services::CloudflareKvConfig;
31use crate::ErrorKind;
32use crate::*;
33
34impl Configurator for CloudflareKvConfig {
35 type Builder = CloudflareKvBuilder;
36 fn into_builder(self) -> Self::Builder {
37 CloudflareKvBuilder {
38 config: self,
39 http_client: None,
40 }
41 }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct CloudflareKvBuilder {
47 config: CloudflareKvConfig,
48
49 http_client: Option<HttpClient>,
51}
52
53impl Debug for CloudflareKvBuilder {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("CloudFlareKvBuilder")
56 .field("config", &self.config)
57 .finish()
58 }
59}
60
61impl CloudflareKvBuilder {
62 pub fn token(mut self, token: &str) -> Self {
64 if !token.is_empty() {
65 self.config.token = Some(token.to_string())
66 }
67 self
68 }
69
70 pub fn account_id(mut self, account_id: &str) -> Self {
72 if !account_id.is_empty() {
73 self.config.account_id = Some(account_id.to_string())
74 }
75 self
76 }
77
78 pub fn namespace_id(mut self, namespace_id: &str) -> Self {
80 if !namespace_id.is_empty() {
81 self.config.namespace_id = Some(namespace_id.to_string())
82 }
83 self
84 }
85
86 pub fn root(mut self, root: &str) -> Self {
88 self.config.root = if root.is_empty() {
89 None
90 } else {
91 Some(root.to_string())
92 };
93
94 self
95 }
96}
97
98impl Builder for CloudflareKvBuilder {
99 const SCHEME: Scheme = Scheme::CloudflareKv;
100 type Config = CloudflareKvConfig;
101
102 fn build(self) -> Result<impl Access> {
103 let authorization = match &self.config.token {
104 Some(token) => format_authorization_by_bearer(token)?,
105 None => return Err(Error::new(ErrorKind::ConfigInvalid, "token is required")),
106 };
107
108 let Some(account_id) = self.config.account_id.clone() else {
109 return Err(Error::new(
110 ErrorKind::ConfigInvalid,
111 "account_id is required",
112 ));
113 };
114
115 let Some(namespace_id) = self.config.namespace_id.clone() else {
116 return Err(Error::new(
117 ErrorKind::ConfigInvalid,
118 "namespace_id is required",
119 ));
120 };
121
122 let client = if let Some(client) = self.http_client {
123 client
124 } else {
125 HttpClient::new().map_err(|err| {
126 err.with_operation("Builder::build")
127 .with_context("service", Scheme::CloudflareKv)
128 })?
129 };
130
131 let root = normalize_root(
132 self.config
133 .root
134 .clone()
135 .unwrap_or_else(|| "/".to_string())
136 .as_str(),
137 );
138
139 let url_prefix = format!(
140 r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}",
141 account_id, namespace_id
142 );
143
144 Ok(CloudflareKvBackend::new(Adapter {
145 authorization,
146 account_id,
147 namespace_id,
148 client,
149 url_prefix,
150 })
151 .with_normalized_root(root))
152 }
153}
154
155pub type CloudflareKvBackend = kv::Backend<Adapter>;
156
157#[derive(Clone)]
158pub struct Adapter {
159 authorization: String,
160 account_id: String,
161 namespace_id: String,
162 client: HttpClient,
163 url_prefix: String,
164}
165
166impl Debug for Adapter {
167 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168 f.debug_struct("Adapter")
169 .field("account_id", &self.account_id)
170 .field("namespace_id", &self.namespace_id)
171 .finish()
172 }
173}
174
175impl Adapter {
176 fn sign<T>(&self, mut req: Request<T>) -> Result<Request<T>> {
177 req.headers_mut()
178 .insert(header::AUTHORIZATION, self.authorization.parse().unwrap());
179 Ok(req)
180 }
181}
182
183impl kv::Adapter for Adapter {
184 type Scanner = kv::Scanner;
185
186 fn info(&self) -> kv::Info {
187 kv::Info::new(
188 Scheme::CloudflareKv,
189 &self.namespace_id,
190 Capability {
191 read: true,
192 write: true,
193 list: true,
194 shared: true,
195
196 ..Default::default()
197 },
198 )
199 }
200
201 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
202 let url = format!("{}/values/{}", self.url_prefix, path);
203 let mut req = Request::get(&url);
204 req = req.header(header::CONTENT_TYPE, "application/json");
205 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
206 req = self.sign(req)?;
207 let resp = self.client.send(req).await?;
208 let status = resp.status();
209 match status {
210 StatusCode::OK => Ok(Some(resp.into_body())),
211 _ => Err(parse_error(resp)),
212 }
213 }
214
215 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
216 let url = format!("{}/values/{}", self.url_prefix, path);
217 let req = Request::put(&url);
218 let multipart = Multipart::new();
219 let multipart = multipart
220 .part(FormDataPart::new("metadata").content(serde_json::Value::Null.to_string()))
221 .part(FormDataPart::new("value").content(value.to_vec()));
222 let mut req = multipart.apply(req)?;
223 req = self.sign(req)?;
224 let resp = self.client.send(req).await?;
225 let status = resp.status();
226 match status {
227 StatusCode::OK => Ok(()),
228 _ => Err(parse_error(resp)),
229 }
230 }
231
232 async fn delete(&self, path: &str) -> Result<()> {
233 let url = format!("{}/values/{}", self.url_prefix, path);
234 let mut req = Request::delete(&url);
235 req = req.header(header::CONTENT_TYPE, "application/json");
236 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
237 req = self.sign(req)?;
238 let resp = self.client.send(req).await?;
239 let status = resp.status();
240 match status {
241 StatusCode::OK => Ok(()),
242 _ => Err(parse_error(resp)),
243 }
244 }
245
246 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
247 let mut url = format!("{}/keys", self.url_prefix);
248 if !path.is_empty() {
249 url = format!("{}?prefix={}", url, path);
250 }
251 let mut req = Request::get(&url);
252 req = req.header(header::CONTENT_TYPE, "application/json");
253 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
254 req = self.sign(req)?;
255 let resp = self.client.send(req).await?;
256 let status = resp.status();
257 match status {
258 StatusCode::OK => {
259 let body = resp.into_body();
260 let response: CfKvScanResponse =
261 serde_json::from_reader(body.reader()).map_err(|e| {
262 Error::new(
263 ErrorKind::Unexpected,
264 format!("failed to parse error response: {}", e),
265 )
266 })?;
267 Ok(Box::new(kv::ScanStdIter::new(
268 response.result.into_iter().map(|r| Ok(r.name)),
269 )))
270 }
271 _ => Err(parse_error(resp)),
272 }
273 }
274}
275
276#[derive(Debug, Deserialize)]
277pub(super) struct CfKvResponse {
278 pub(super) errors: Vec<CfKvError>,
279}
280
281#[derive(Debug, Deserialize)]
282pub(super) struct CfKvScanResponse {
283 result: Vec<CfKvScanResult>,
284 }
287
288#[derive(Debug, Deserialize)]
289struct CfKvScanResult {
290 name: String,
291}
292
293#[derive(Debug, Deserialize)]
300pub(super) struct CfKvError {
301 pub(super) code: i32,
302}
303
304#[cfg(test)]
305mod test {
306 use super::*;
307
308 #[test]
309 fn test_deserialize_scan_json_response() {
310 let json_str = r#"{
311 "errors": [],
312 "messages": [],
313 "result": [
314 {
315 "expiration": 1577836800,
316 "metadata": {
317 "someMetadataKey": "someMetadataValue"
318 },
319 "name": "My-Key"
320 }
321 ],
322 "success": true,
323 "result_info": {
324 "count": 1,
325 "cursor": "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"
326 }
327 }"#;
328
329 let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap();
330
331 assert_eq!(response.result.len(), 1);
332 assert_eq!(response.result[0].name, "My-Key");
333 }
339
340 #[test]
341 fn test_deserialize_json_response() {
342 let json_str = r#"{
343 "errors": [],
344 "messages": [],
345 "result": {},
346 "success": true
347 }"#;
348
349 let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap();
350
351 assert_eq!(response.errors.len(), 0);
352 }
353}