opendal/services/cloudflare_kv/
core.rs1use std::sync::Arc;
19use std::time::Duration;
20
21use http::header;
22use http::request;
23use http::Request;
24use http::Response;
25use serde_json::json;
26
27use crate::raw::new_json_serialize_error;
28use crate::raw::percent_encode_path;
29use crate::raw::Operation;
30use crate::raw::QueryPairsWriter;
31use crate::raw::{new_request_build_error, AccessorInfo, FormDataPart, Multipart};
32use crate::services::cloudflare_kv::model::CfKvMetadata;
33use crate::{Buffer, Result};
34
35#[derive(Debug, Clone)]
36pub struct CloudflareKvCore {
37 pub api_token: String,
38 pub account_id: String,
39 pub namespace_id: String,
40 pub expiration_ttl: Option<Duration>,
41 pub info: Arc<AccessorInfo>,
42}
43
44impl CloudflareKvCore {
45 #[inline]
46 async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
47 self.info.http_client().send(req).await
48 }
49
50 fn sign(&self, req: request::Builder) -> request::Builder {
51 req.header(header::AUTHORIZATION, &self.api_token)
52 }
53
54 fn url_prefix(&self) -> String {
55 let url = format!(
56 "https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}",
57 self.account_id, self.namespace_id
58 );
59 url
60 }
61}
62
63impl CloudflareKvCore {
64 pub async fn metadata(&self, path: &str) -> Result<Response<Buffer>> {
65 let url = format!(
66 "{}/metadata/{}",
67 self.url_prefix(),
68 percent_encode_path(path)
69 );
70
71 let req = Request::get(url);
72 let req = self.sign(req);
73
74 let req = req
75 .extension(Operation::Stat)
76 .body(Buffer::new())
77 .map_err(new_request_build_error)?;
78
79 self.send(req).await
80 }
81
82 pub async fn get(&self, path: &str) -> Result<Response<Buffer>> {
83 let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
84 let req = Request::get(url);
85
86 let req = self.sign(req);
87
88 let req = req
89 .extension(Operation::Read)
90 .body(Buffer::new())
91 .map_err(new_request_build_error)?;
92
93 self.send(req).await
94 }
95
96 pub async fn set(
97 &self,
98 path: &str,
99 value: Buffer,
100 metadata: CfKvMetadata,
101 ) -> Result<Response<Buffer>> {
102 let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
103
104 let req = Request::put(url);
105 let req = self.sign(req);
106 let req = req.extension(Operation::Write);
107
108 let mut multipart = Multipart::new()
109 .part(FormDataPart::new("value").content(value))
110 .part(
111 FormDataPart::new("metadata")
112 .content(serde_json::to_string(&metadata).map_err(new_json_serialize_error)?),
113 );
114
115 if let Some(expiration_ttl) = self.expiration_ttl {
116 multipart = multipart.part(
117 FormDataPart::new("expiration_ttl").content(expiration_ttl.as_secs().to_string()),
118 );
119 }
120
121 let req = multipart.apply(req)?;
122
123 self.send(req).await
124 }
125
126 pub async fn delete(&self, paths: &[String]) -> Result<Response<Buffer>> {
127 let url = format!("{}/bulk/delete", self.url_prefix());
128
129 let req = Request::post(&url);
130
131 let req = self.sign(req);
132 let req_body = &json!(paths);
133 let req = req
134 .extension(Operation::Delete)
135 .header(header::CONTENT_TYPE, "application/json")
136 .body(Buffer::from(req_body.to_string()))
137 .map_err(new_request_build_error)?;
138
139 self.send(req).await
140 }
141
142 pub async fn list(
143 &self,
144 prefix: &str,
145 limit: Option<usize>,
146 cursor: Option<String>,
147 ) -> Result<Response<Buffer>> {
148 let url = format!("{}/keys", self.url_prefix());
149 let mut url = QueryPairsWriter::new(&url);
150 if let Some(cursor) = cursor {
151 if !cursor.is_empty() {
152 url = url.push("cursor", &cursor);
153 }
154 }
155 url = url.push("limit", &limit.unwrap_or(1000).to_string());
156 url = url.push("prefix", &percent_encode_path(prefix));
157
158 let req = Request::get(url.finish());
159
160 let req = self.sign(req);
161 let req = req
162 .extension(Operation::List)
163 .body(Buffer::new())
164 .map_err(new_request_build_error)?;
165
166 self.send(req).await
167 }
168}