opendal_core/services/cloudflare_kv/
core.rs1use std::sync::Arc;
19
20use http::Request;
21use http::Response;
22use http::header;
23use http::request;
24use serde_json::json;
25
26use crate::raw::*;
27use crate::services::cloudflare_kv::model::CfKvMetadata;
28use crate::{Buffer, Result};
29
30#[derive(Debug, Clone)]
31pub struct CloudflareKvCore {
32 pub api_token: String,
33 pub account_id: String,
34 pub namespace_id: String,
35 pub expiration_ttl: Option<Duration>,
36 pub info: Arc<AccessorInfo>,
37}
38
39impl CloudflareKvCore {
40 #[inline]
41 async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
42 self.info.http_client().send(req).await
43 }
44
45 fn sign(&self, req: request::Builder) -> request::Builder {
46 req.header(header::AUTHORIZATION, &self.api_token)
47 }
48
49 fn url_prefix(&self) -> String {
50 let url = format!(
51 "https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}",
52 self.account_id, self.namespace_id
53 );
54 url
55 }
56}
57
58impl CloudflareKvCore {
59 pub async fn metadata(&self, path: &str) -> Result<Response<Buffer>> {
60 let url = format!(
61 "{}/metadata/{}",
62 self.url_prefix(),
63 percent_encode_path(path)
64 );
65
66 let req = Request::get(url);
67 let req = self.sign(req);
68
69 let req = req
70 .extension(Operation::Stat)
71 .body(Buffer::new())
72 .map_err(new_request_build_error)?;
73
74 self.send(req).await
75 }
76
77 pub async fn get(&self, path: &str) -> Result<Response<Buffer>> {
78 let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
79 let req = Request::get(url);
80
81 let req = self.sign(req);
82
83 let req = req
84 .extension(Operation::Read)
85 .body(Buffer::new())
86 .map_err(new_request_build_error)?;
87
88 self.send(req).await
89 }
90
91 pub async fn set(
92 &self,
93 path: &str,
94 value: Buffer,
95 metadata: CfKvMetadata,
96 ) -> Result<Response<Buffer>> {
97 let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
98
99 let req = Request::put(url);
100 let req = self.sign(req);
101 let req = req.extension(Operation::Write);
102
103 let mut multipart = Multipart::new()
104 .part(FormDataPart::new("value").content(value))
105 .part(
106 FormDataPart::new("metadata")
107 .content(serde_json::to_string(&metadata).map_err(new_json_serialize_error)?),
108 );
109
110 if let Some(expiration_ttl) = self.expiration_ttl {
111 multipart = multipart.part(
112 FormDataPart::new("expiration_ttl").content(expiration_ttl.as_secs().to_string()),
113 );
114 }
115
116 let req = multipart.apply(req)?;
117
118 self.send(req).await
119 }
120
121 pub async fn delete(&self, paths: &[String]) -> Result<Response<Buffer>> {
122 let url = format!("{}/bulk/delete", self.url_prefix());
123
124 let req = Request::post(&url);
125
126 let req = self.sign(req);
127 let req_body = &json!(paths);
128 let req = req
129 .extension(Operation::Delete)
130 .header(header::CONTENT_TYPE, "application/json")
131 .body(Buffer::from(req_body.to_string()))
132 .map_err(new_request_build_error)?;
133
134 self.send(req).await
135 }
136
137 pub async fn list(
138 &self,
139 prefix: &str,
140 limit: Option<usize>,
141 cursor: Option<String>,
142 ) -> Result<Response<Buffer>> {
143 let url = format!("{}/keys", self.url_prefix());
144 let mut url = QueryPairsWriter::new(&url);
145 if let Some(cursor) = cursor {
146 if !cursor.is_empty() {
147 url = url.push("cursor", &cursor);
148 }
149 }
150 url = url.push("limit", &limit.unwrap_or(1000).to_string());
151 url = url.push("prefix", &percent_encode_path(prefix));
152
153 let req = Request::get(url.finish());
154
155 let req = self.sign(req);
156 let req = req
157 .extension(Operation::List)
158 .body(Buffer::new())
159 .map_err(new_request_build_error)?;
160
161 self.send(req).await
162 }
163}