opendal/services/cloudflare_kv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use bytes::Buf;
22use http::header;
23use http::Request;
24use http::StatusCode;
25use serde::Deserialize;
26
27use super::error::parse_error;
28use crate::raw::adapters::kv;
29use crate::raw::*;
30use crate::services::CloudflareKvConfig;
31use crate::ErrorKind;
32use crate::*;
33
34impl Configurator for CloudflareKvConfig {
35    type Builder = CloudflareKvBuilder;
36    fn into_builder(self) -> Self::Builder {
37        CloudflareKvBuilder {
38            config: self,
39            http_client: None,
40        }
41    }
42}
43
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct CloudflareKvBuilder {
47    config: CloudflareKvConfig,
48
49    /// The HTTP client used to communicate with CloudFlare.
50    http_client: Option<HttpClient>,
51}
52
53impl Debug for CloudflareKvBuilder {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("CloudFlareKvBuilder")
56            .field("config", &self.config)
57            .finish()
58    }
59}
60
61impl CloudflareKvBuilder {
62    /// Set the token used to authenticate with CloudFlare.
63    pub fn token(mut self, token: &str) -> Self {
64        if !token.is_empty() {
65            self.config.token = Some(token.to_string())
66        }
67        self
68    }
69
70    /// Set the account ID used to authenticate with CloudFlare.
71    pub fn account_id(mut self, account_id: &str) -> Self {
72        if !account_id.is_empty() {
73            self.config.account_id = Some(account_id.to_string())
74        }
75        self
76    }
77
78    /// Set the namespace ID.
79    pub fn namespace_id(mut self, namespace_id: &str) -> Self {
80        if !namespace_id.is_empty() {
81            self.config.namespace_id = Some(namespace_id.to_string())
82        }
83        self
84    }
85
86    /// Set the root within this backend.
87    pub fn root(mut self, root: &str) -> Self {
88        self.config.root = if root.is_empty() {
89            None
90        } else {
91            Some(root.to_string())
92        };
93
94        self
95    }
96}
97
98impl Builder for CloudflareKvBuilder {
99    const SCHEME: Scheme = Scheme::CloudflareKv;
100    type Config = CloudflareKvConfig;
101
102    fn build(self) -> Result<impl Access> {
103        let authorization = match &self.config.token {
104            Some(token) => format_authorization_by_bearer(token)?,
105            None => return Err(Error::new(ErrorKind::ConfigInvalid, "token is required")),
106        };
107
108        let Some(account_id) = self.config.account_id.clone() else {
109            return Err(Error::new(
110                ErrorKind::ConfigInvalid,
111                "account_id is required",
112            ));
113        };
114
115        let Some(namespace_id) = self.config.namespace_id.clone() else {
116            return Err(Error::new(
117                ErrorKind::ConfigInvalid,
118                "namespace_id is required",
119            ));
120        };
121
122        let client = if let Some(client) = self.http_client {
123            client
124        } else {
125            HttpClient::new().map_err(|err| {
126                err.with_operation("Builder::build")
127                    .with_context("service", Scheme::CloudflareKv)
128            })?
129        };
130
131        let root = normalize_root(
132            self.config
133                .root
134                .clone()
135                .unwrap_or_else(|| "/".to_string())
136                .as_str(),
137        );
138
139        let url_prefix = format!(
140            r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}",
141            account_id, namespace_id
142        );
143
144        Ok(CloudflareKvBackend::new(Adapter {
145            authorization,
146            account_id,
147            namespace_id,
148            client,
149            url_prefix,
150        })
151        .with_normalized_root(root))
152    }
153}
154
155pub type CloudflareKvBackend = kv::Backend<Adapter>;
156
157#[derive(Clone)]
158pub struct Adapter {
159    authorization: String,
160    account_id: String,
161    namespace_id: String,
162    client: HttpClient,
163    url_prefix: String,
164}
165
166impl Debug for Adapter {
167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168        f.debug_struct("Adapter")
169            .field("account_id", &self.account_id)
170            .field("namespace_id", &self.namespace_id)
171            .finish()
172    }
173}
174
175impl Adapter {
176    fn sign<T>(&self, mut req: Request<T>) -> Result<Request<T>> {
177        req.headers_mut()
178            .insert(header::AUTHORIZATION, self.authorization.parse().unwrap());
179        Ok(req)
180    }
181}
182
183impl kv::Adapter for Adapter {
184    type Scanner = kv::Scanner;
185
186    fn info(&self) -> kv::Info {
187        kv::Info::new(
188            Scheme::CloudflareKv,
189            &self.namespace_id,
190            Capability {
191                read: true,
192                write: true,
193                list: true,
194                shared: true,
195
196                ..Default::default()
197            },
198        )
199    }
200
201    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
202        let url = format!("{}/values/{}", self.url_prefix, path);
203        let mut req = Request::get(&url);
204        req = req.header(header::CONTENT_TYPE, "application/json");
205        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
206        req = self.sign(req)?;
207        let resp = self.client.send(req).await?;
208        let status = resp.status();
209        match status {
210            StatusCode::OK => Ok(Some(resp.into_body())),
211            _ => Err(parse_error(resp)),
212        }
213    }
214
215    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
216        let url = format!("{}/values/{}", self.url_prefix, path);
217        let req = Request::put(&url);
218        let multipart = Multipart::new();
219        let multipart = multipart
220            .part(FormDataPart::new("metadata").content(serde_json::Value::Null.to_string()))
221            .part(FormDataPart::new("value").content(value.to_vec()));
222        let mut req = multipart.apply(req)?;
223        req = self.sign(req)?;
224        let resp = self.client.send(req).await?;
225        let status = resp.status();
226        match status {
227            StatusCode::OK => Ok(()),
228            _ => Err(parse_error(resp)),
229        }
230    }
231
232    async fn delete(&self, path: &str) -> Result<()> {
233        let url = format!("{}/values/{}", self.url_prefix, path);
234        let mut req = Request::delete(&url);
235        req = req.header(header::CONTENT_TYPE, "application/json");
236        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
237        req = self.sign(req)?;
238        let resp = self.client.send(req).await?;
239        let status = resp.status();
240        match status {
241            StatusCode::OK => Ok(()),
242            _ => Err(parse_error(resp)),
243        }
244    }
245
246    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
247        let mut url = format!("{}/keys", self.url_prefix);
248        if !path.is_empty() {
249            url = format!("{}?prefix={}", url, path);
250        }
251        let mut req = Request::get(&url);
252        req = req.header(header::CONTENT_TYPE, "application/json");
253        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
254        req = self.sign(req)?;
255        let resp = self.client.send(req).await?;
256        let status = resp.status();
257        match status {
258            StatusCode::OK => {
259                let body = resp.into_body();
260                let response: CfKvScanResponse =
261                    serde_json::from_reader(body.reader()).map_err(|e| {
262                        Error::new(
263                            ErrorKind::Unexpected,
264                            format!("failed to parse error response: {}", e),
265                        )
266                    })?;
267                Ok(Box::new(kv::ScanStdIter::new(
268                    response.result.into_iter().map(|r| Ok(r.name)),
269                )))
270            }
271            _ => Err(parse_error(resp)),
272        }
273    }
274}
275
276#[derive(Debug, Deserialize)]
277pub(super) struct CfKvResponse {
278    pub(super) errors: Vec<CfKvError>,
279}
280
281#[derive(Debug, Deserialize)]
282pub(super) struct CfKvScanResponse {
283    result: Vec<CfKvScanResult>,
284    // According to https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace'-s-keys, result_info is used to determine if there are more keys to be listed
285    // result_info: Option<CfKvResultInfo>,
286}
287
288#[derive(Debug, Deserialize)]
289struct CfKvScanResult {
290    name: String,
291}
292
293// #[derive(Debug, Deserialize)]
294// struct CfKvResultInfo {
295//     count: i64,
296//     cursor: String,
297// }
298
299#[derive(Debug, Deserialize)]
300pub(super) struct CfKvError {
301    pub(super) code: i32,
302}
303
304#[cfg(test)]
305mod test {
306    use super::*;
307
308    #[test]
309    fn test_deserialize_scan_json_response() {
310        let json_str = r#"{
311			"errors": [],
312			"messages": [],
313			"result": [
314				{
315				"expiration": 1577836800,
316				"metadata": {
317					"someMetadataKey": "someMetadataValue"
318				},
319				"name": "My-Key"
320				}
321			],
322			"success": true,
323			"result_info": {
324				"count": 1,
325				"cursor": "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"
326			}
327		}"#;
328
329        let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap();
330
331        assert_eq!(response.result.len(), 1);
332        assert_eq!(response.result[0].name, "My-Key");
333        // assert!(response.result_info.is_some());
334        // if let Some(result_info) = response.result_info {
335        //     assert_eq!(result_info.count, 1);
336        //     assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw");
337        // }
338    }
339
340    #[test]
341    fn test_deserialize_json_response() {
342        let json_str = r#"{
343			"errors": [],
344			"messages": [],
345			"result": {},
346			"success": true
347		}"#;
348
349        let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap();
350
351        assert_eq!(response.errors.len(), 0);
352    }
353}