opendal_core/services/cloudflare_kv/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::StatusCode;
23
24use super::CLOUDFLARE_KV_SCHEME;
25use super::config::CloudflareKvConfig;
26use super::core::CloudflareKvCore;
27use super::deleter::CloudflareKvDeleter;
28use super::error::parse_error;
29use super::lister::CloudflareKvLister;
30use super::model::*;
31use super::writer::CloudflareWriter;
32use crate::raw::*;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
36#[derive(Default)]
37pub struct CloudflareKvBuilder {
38 pub(super) config: CloudflareKvConfig,
39
40 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
42 pub(super) http_client: Option<HttpClient>,
43}
44
45impl Debug for CloudflareKvBuilder {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("CloudflareKvBuilder")
48 .field("config", &self.config)
49 .finish_non_exhaustive()
50 }
51}
52
53impl CloudflareKvBuilder {
54 pub fn api_token(mut self, api_token: &str) -> Self {
56 if !api_token.is_empty() {
57 self.config.api_token = Some(api_token.to_string())
58 }
59 self
60 }
61
62 pub fn account_id(mut self, account_id: &str) -> Self {
64 if !account_id.is_empty() {
65 self.config.account_id = Some(account_id.to_string())
66 }
67 self
68 }
69
70 pub fn namespace_id(mut self, namespace_id: &str) -> Self {
72 if !namespace_id.is_empty() {
73 self.config.namespace_id = Some(namespace_id.to_string())
74 }
75 self
76 }
77
78 pub fn default_ttl(mut self, ttl: Duration) -> Self {
82 self.config.default_ttl = Some(ttl);
83 self
84 }
85
86 pub fn root(mut self, root: &str) -> Self {
88 self.config.root = if root.is_empty() {
89 None
90 } else {
91 Some(root.to_string())
92 };
93
94 self
95 }
96}
97
98impl Builder for CloudflareKvBuilder {
99 type Config = CloudflareKvConfig;
100
101 fn build(self) -> Result<impl Access> {
102 let api_token = match &self.config.api_token {
103 Some(api_token) => format_authorization_by_bearer(api_token)?,
104 None => {
105 return Err(Error::new(
106 ErrorKind::ConfigInvalid,
107 "api_token is required",
108 ));
109 }
110 };
111
112 let Some(account_id) = self.config.account_id.clone() else {
113 return Err(Error::new(
114 ErrorKind::ConfigInvalid,
115 "account_id is required",
116 ));
117 };
118
119 let Some(namespace_id) = self.config.namespace_id.clone() else {
120 return Err(Error::new(
121 ErrorKind::ConfigInvalid,
122 "namespace_id is required",
123 ));
124 };
125
126 if let Some(ttl) = self.config.default_ttl {
128 if ttl < Duration::from_secs(60) {
129 return Err(Error::new(
130 ErrorKind::ConfigInvalid,
131 "Default TTL must be at least 60 seconds",
132 ));
133 }
134 }
135
136 let root = normalize_root(
137 self.config
138 .root
139 .clone()
140 .unwrap_or_else(|| "/".to_string())
141 .as_str(),
142 );
143
144 Ok(CloudflareKvBackend {
145 core: Arc::new(CloudflareKvCore {
146 api_token,
147 account_id,
148 namespace_id,
149 expiration_ttl: self.config.default_ttl,
150 info: {
151 let am = AccessorInfo::default();
152 am.set_scheme(CLOUDFLARE_KV_SCHEME)
153 .set_root(&root)
154 .set_native_capability(Capability {
155 create_dir: true,
156
157 stat: true,
158 stat_with_if_match: true,
159 stat_with_if_none_match: true,
160 stat_with_if_modified_since: true,
161 stat_with_if_unmodified_since: true,
162
163 read: true,
164 read_with_if_match: true,
165 read_with_if_none_match: true,
166 read_with_if_modified_since: true,
167 read_with_if_unmodified_since: true,
168
169 write: true,
170 write_can_empty: true,
171 write_total_max_size: Some(25 * 1024 * 1024),
172
173 list: true,
174 list_with_limit: true,
175 list_with_recursive: true,
176
177 delete: true,
178 delete_max_size: Some(10000),
179
180 shared: false,
181
182 ..Default::default()
183 });
184
185 #[allow(deprecated)]
187 if let Some(client) = self.http_client {
188 am.update_http_client(|_| client);
189 }
190
191 am.into()
192 },
193 }),
194 })
195 }
196}
197
198#[derive(Debug, Clone)]
199pub struct CloudflareKvBackend {
200 core: Arc<CloudflareKvCore>,
201}
202
203impl Access for CloudflareKvBackend {
204 type Reader = Buffer;
205 type Writer = oio::OneShotWriter<CloudflareWriter>;
206 type Lister = oio::PageLister<CloudflareKvLister>;
207 type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
208
209 fn info(&self) -> Arc<AccessorInfo> {
210 self.core.info.clone()
211 }
212
213 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
214 let path = build_abs_path(&self.core.info.root(), path);
215
216 if path == build_abs_path(&self.core.info.root(), "") {
217 return Ok(RpCreateDir::default());
218 }
219
220 let segments: Vec<&str> = path
222 .trim_start_matches('/')
223 .trim_end_matches('/')
224 .split('/')
225 .collect();
226
227 let mut current_path = String::from("/");
229 for segment in segments {
230 if !current_path.ends_with('/') {
232 current_path.push('/');
233 }
234 current_path.push_str(segment);
235 current_path.push('/');
236
237 let cf_kv_metadata = CfKvMetadata {
239 etag: build_tmp_path_of(¤t_path),
240 last_modified: Timestamp::now().to_string(),
241 content_length: 0,
242 is_dir: true,
243 };
244
245 self.core
247 .set(¤t_path, Buffer::new(), cf_kv_metadata)
248 .await?;
249 }
250
251 Ok(RpCreateDir::default())
252 }
253
254 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
255 let path = build_abs_path(&self.core.info.root(), path);
256 let new_path = path.trim_end_matches('/');
257
258 let resp = self.core.metadata(new_path).await?;
259
260 if resp.status() != StatusCode::OK {
262 if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
264 let list_resp = self.core.list(&path, None, None).await?;
266
267 if list_resp.status() == StatusCode::OK {
268 let list_body = list_resp.into_body();
269 let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
270 .map_err(new_json_deserialize_error)?;
271
272 if let Some(entries) = list_result.result {
274 if !entries.is_empty() {
275 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
276 }
277 }
278
279 return Err(Error::new(
281 ErrorKind::NotFound,
282 "key not found in CloudFlare KV",
283 ));
284 }
285 }
286
287 return Err(parse_error(resp));
289 }
290
291 let resp_body = resp.into_body();
292 let cf_response: CfKvStatResponse =
293 serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
294
295 if !cf_response.success {
296 return Err(Error::new(
297 ErrorKind::Unexpected,
298 "cloudflare_kv stat this key failed for reason we don't know",
299 ));
300 }
301
302 let metadata = match cf_response.result {
303 Some(metadata) => {
304 if path.ends_with('/') && !metadata.is_dir {
305 return Err(Error::new(
306 ErrorKind::NotFound,
307 "key not found in CloudFlare KV",
308 ));
309 } else {
310 metadata
311 }
312 }
313 None => {
314 return Err(Error::new(
315 ErrorKind::NotFound,
316 "key not found in CloudFlare KV",
317 ));
318 }
319 };
320
321 if let Some(if_match) = &args.if_match() {
323 if if_match != &metadata.etag {
324 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
325 }
326 }
327
328 if let Some(if_none_match) = &args.if_none_match() {
330 if if_none_match == &metadata.etag {
331 return Err(Error::new(
332 ErrorKind::ConditionNotMatch,
333 "etag match when expected none match",
334 ));
335 }
336 }
337
338 let last_modified = metadata
340 .last_modified
341 .parse::<Timestamp>()
342 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
343
344 if let Some(modified_since) = &args.if_modified_since() {
346 if !last_modified.gt(modified_since) {
347 return Err(Error::new(
348 ErrorKind::ConditionNotMatch,
349 "not modified since specified time",
350 ));
351 }
352 }
353
354 if let Some(unmodified_since) = &args.if_unmodified_since() {
356 if !last_modified.le(unmodified_since) {
357 return Err(Error::new(
358 ErrorKind::ConditionNotMatch,
359 "modified since specified time",
360 ));
361 }
362 }
363
364 let meta = Metadata::new(if metadata.is_dir {
365 EntryMode::DIR
366 } else {
367 EntryMode::FILE
368 })
369 .with_etag(metadata.etag)
370 .with_content_length(metadata.content_length as u64)
371 .with_last_modified(metadata.last_modified.parse::<Timestamp>()?);
372
373 Ok(RpStat::new(meta))
374 }
375
376 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
377 let path = build_abs_path(&self.core.info.root(), path);
378 let resp = self.core.get(&path).await?;
379
380 let status = resp.status();
381
382 if status != StatusCode::OK {
383 return Err(parse_error(resp));
384 }
385
386 let resp_body = resp.into_body();
387
388 if args.if_match().is_some()
389 || args.if_none_match().is_some()
390 || args.if_modified_since().is_some()
391 || args.if_unmodified_since().is_some()
392 {
393 let meta_resp = self.core.metadata(&path).await?;
394
395 if meta_resp.status() != StatusCode::OK {
396 return Err(parse_error(meta_resp));
397 }
398
399 let cf_response: CfKvStatResponse =
400 serde_json::from_reader(meta_resp.into_body().reader())
401 .map_err(new_json_deserialize_error)?;
402
403 if !cf_response.success && cf_response.result.is_some() {
404 return Err(Error::new(
405 ErrorKind::Unexpected,
406 "cloudflare_kv read this key failed for reason we don't know",
407 ));
408 }
409
410 let metadata = cf_response.result.unwrap();
411
412 if let Some(if_match) = &args.if_match() {
414 if if_match != &metadata.etag {
415 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
416 }
417 }
418
419 if let Some(if_none_match) = &args.if_none_match() {
421 if if_none_match == &metadata.etag {
422 return Err(Error::new(
423 ErrorKind::ConditionNotMatch,
424 "etag match when expected none match",
425 ));
426 }
427 }
428
429 let last_modified = metadata
431 .last_modified
432 .parse::<Timestamp>()
433 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
434
435 if let Some(modified_since) = &args.if_modified_since() {
437 if !last_modified.gt(modified_since) {
438 return Err(Error::new(
439 ErrorKind::ConditionNotMatch,
440 "not modified since specified time",
441 ));
442 }
443 }
444
445 if let Some(unmodified_since) = &args.if_unmodified_since() {
447 if !last_modified.le(unmodified_since) {
448 return Err(Error::new(
449 ErrorKind::ConditionNotMatch,
450 "modified since specified time",
451 ));
452 }
453 }
454 }
455
456 let range = args.range();
457 let buffer = if range.is_full() {
458 resp_body
459 } else {
460 let start = range.offset() as usize;
461 let end = match range.size() {
462 Some(size) => (range.offset() + size) as usize,
463 None => resp_body.len(),
464 };
465 resp_body.slice(start..end.min(resp_body.len()))
466 };
467 Ok((RpRead::new(), buffer))
468 }
469
470 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
471 let path = build_abs_path(&self.core.info.root(), path);
472 let writer = CloudflareWriter::new(self.core.clone(), path);
473
474 let w = oio::OneShotWriter::new(writer);
475
476 Ok((RpWrite::default(), w))
477 }
478
479 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
480 Ok((
481 RpDelete::default(),
482 oio::BatchDeleter::new(
483 CloudflareKvDeleter::new(self.core.clone()),
484 self.core.info.full_capability().delete_max_size,
485 ),
486 ))
487 }
488
489 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
490 let path = build_abs_path(&self.core.info.root(), path);
491
492 let limit = match args.limit() {
493 Some(limit) => {
494 if !(10..=1000).contains(&limit) {
496 1000
497 } else {
498 limit
499 }
500 }
501 None => 1000,
502 };
503
504 let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
505
506 Ok((RpList::default(), oio::PageLister::new(l)))
507 }
508}