opendal/services/cloudflare_kv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use bytes::Buf;
22use http::header;
23use http::Request;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::error::parse_error;
28use crate::raw::adapters::kv;
29use crate::raw::*;
30use crate::services::CloudflareKvConfig;
31use crate::ErrorKind;
32use crate::*;
33
34impl Configurator for CloudflareKvConfig {
35 type Builder = CloudflareKvBuilder;
36 fn into_builder(self) -> Self::Builder {
37 CloudflareKvBuilder {
38 config: self,
39 http_client: None,
40 }
41 }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct CloudflareKvBuilder {
47 config: CloudflareKvConfig,
48
49 http_client: Option<HttpClient>,
51}
52
53impl Debug for CloudflareKvBuilder {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("CloudFlareKvBuilder")
56 .field("config", &self.config)
57 .finish()
58 }
59}
60
61impl CloudflareKvBuilder {
62 pub fn token(mut self, token: &str) -> Self {
64 if !token.is_empty() {
65 self.config.token = Some(token.to_string())
66 }
67 self
68 }
69
70 pub fn account_id(mut self, account_id: &str) -> Self {
72 if !account_id.is_empty() {
73 self.config.account_id = Some(account_id.to_string())
74 }
75 self
76 }
77
78 pub fn namespace_id(mut self, namespace_id: &str) -> Self {
80 if !namespace_id.is_empty() {
81 self.config.namespace_id = Some(namespace_id.to_string())
82 }
83 self
84 }
85
86 pub fn root(mut self, root: &str) -> Self {
88 self.config.root = if root.is_empty() {
89 None
90 } else {
91 Some(root.to_string())
92 };
93
94 self
95 }
96}
97
98impl Builder for CloudflareKvBuilder {
99 type Config = CloudflareKvConfig;
100
101 fn build(self) -> Result<impl Access> {
102 let authorization = match &self.config.token {
103 Some(token) => format_authorization_by_bearer(token)?,
104 None => return Err(Error::new(ErrorKind::ConfigInvalid, "token is required")),
105 };
106
107 let Some(account_id) = self.config.account_id.clone() else {
108 return Err(Error::new(
109 ErrorKind::ConfigInvalid,
110 "account_id is required",
111 ));
112 };
113
114 let Some(namespace_id) = self.config.namespace_id.clone() else {
115 return Err(Error::new(
116 ErrorKind::ConfigInvalid,
117 "namespace_id is required",
118 ));
119 };
120
121 let client = if let Some(client) = self.http_client {
122 client
123 } else {
124 HttpClient::new().map_err(|err| {
125 err.with_operation("Builder::build")
126 .with_context("service", Scheme::CloudflareKv)
127 })?
128 };
129
130 let root = normalize_root(
131 self.config
132 .root
133 .clone()
134 .unwrap_or_else(|| "/".to_string())
135 .as_str(),
136 );
137
138 let url_prefix = format!(
139 r"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}"
140 );
141
142 Ok(CloudflareKvBackend::new(Adapter {
143 authorization,
144 account_id,
145 namespace_id,
146 client,
147 url_prefix,
148 })
149 .with_normalized_root(root))
150 }
151}
152
153pub type CloudflareKvBackend = kv::Backend<Adapter>;
154
155#[derive(Clone)]
156pub struct Adapter {
157 authorization: String,
158 account_id: String,
159 namespace_id: String,
160 client: HttpClient,
161 url_prefix: String,
162}
163
164impl Debug for Adapter {
165 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166 f.debug_struct("Adapter")
167 .field("account_id", &self.account_id)
168 .field("namespace_id", &self.namespace_id)
169 .finish()
170 }
171}
172
173impl Adapter {
174 fn sign<T>(&self, mut req: Request<T>) -> Result<Request<T>> {
175 req.headers_mut()
176 .insert(header::AUTHORIZATION, self.authorization.parse().unwrap());
177 Ok(req)
178 }
179}
180
181impl kv::Adapter for Adapter {
182 type Scanner = kv::Scanner;
183
184 fn info(&self) -> kv::Info {
185 kv::Info::new(
186 Scheme::CloudflareKv,
187 &self.namespace_id,
188 Capability {
189 read: true,
190 write: true,
191 list: true,
192 shared: true,
193
194 ..Default::default()
195 },
196 )
197 }
198
199 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
200 let url = format!("{}/values/{}", self.url_prefix, path);
201 let mut req = Request::get(&url);
202 req = req.header(header::CONTENT_TYPE, "application/json");
203 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
204 req = self.sign(req)?;
205 let resp = self.client.send(req).await?;
206 let status = resp.status();
207 match status {
208 StatusCode::OK => Ok(Some(resp.into_body())),
209 _ => Err(parse_error(resp)),
210 }
211 }
212
213 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
214 let url = format!("{}/values/{}", self.url_prefix, path);
215 let req = Request::put(&url);
216 let multipart = Multipart::new();
217 let multipart = multipart
218 .part(FormDataPart::new("metadata").content(serde_json::Value::Null.to_string()))
219 .part(FormDataPart::new("value").content(value.to_vec()));
220 let mut req = multipart.apply(req)?;
221 req = self.sign(req)?;
222 let resp = self.client.send(req).await?;
223 let status = resp.status();
224 match status {
225 StatusCode::OK => Ok(()),
226 _ => Err(parse_error(resp)),
227 }
228 }
229
230 async fn delete(&self, path: &str) -> Result<()> {
231 let url = format!("{}/values/{}", self.url_prefix, path);
232 let mut req = Request::delete(&url);
233 req = req.header(header::CONTENT_TYPE, "application/json");
234 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
235 req = self.sign(req)?;
236 let resp = self.client.send(req).await?;
237 let status = resp.status();
238 match status {
239 StatusCode::OK => Ok(()),
240 _ => Err(parse_error(resp)),
241 }
242 }
243
244 async fn scan(&self, path: &str) -> Result<Self::Scanner> {
245 let mut url = format!("{}/keys", self.url_prefix);
246 if !path.is_empty() {
247 url = format!("{url}?prefix={path}");
248 }
249 let mut req = Request::get(&url);
250 req = req.header(header::CONTENT_TYPE, "application/json");
251 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
252 req = self.sign(req)?;
253 let resp = self.client.send(req).await?;
254 let status = resp.status();
255 match status {
256 StatusCode::OK => {
257 let body = resp.into_body();
258 let response: CfKvScanResponse =
259 serde_json::from_reader(body.reader()).map_err(|e| {
260 Error::new(
261 ErrorKind::Unexpected,
262 format!("failed to parse error response: {e}"),
263 )
264 })?;
265 Ok(Box::new(kv::ScanStdIter::new(
266 response.result.into_iter().map(|r| Ok(r.name)),
267 )))
268 }
269 _ => Err(parse_error(resp)),
270 }
271 }
272}
273
274#[derive(Debug, Deserialize)]
275pub(super) struct CfKvResponse {
276 pub(super) errors: Vec<CfKvError>,
277}
278
279#[derive(Debug, Deserialize)]
280pub(super) struct CfKvScanResponse {
281 result: Vec<CfKvScanResult>,
282 }
285
286#[derive(Debug, Deserialize)]
287struct CfKvScanResult {
288 name: String,
289}
290
291#[derive(Debug, Deserialize)]
298pub(super) struct CfKvError {
299 pub(super) code: i32,
300}
301
302#[cfg(test)]
303mod test {
304 use super::*;
305
306 #[test]
307 fn test_deserialize_scan_json_response() {
308 let json_str = r#"{
309 "errors": [],
310 "messages": [],
311 "result": [
312 {
313 "expiration": 1577836800,
314 "metadata": {
315 "someMetadataKey": "someMetadataValue"
316 },
317 "name": "My-Key"
318 }
319 ],
320 "success": true,
321 "result_info": {
322 "count": 1,
323 "cursor": "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"
324 }
325 }"#;
326
327 let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap();
328
329 assert_eq!(response.result.len(), 1);
330 assert_eq!(response.result[0].name, "My-Key");
331 }
337
338 #[test]
339 fn test_deserialize_json_response() {
340 let json_str = r#"{
341 "errors": [],
342 "messages": [],
343 "result": {},
344 "success": true
345 }"#;
346
347 let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap();
348
349 assert_eq!(response.errors.len(), 0);
350 }
351}