opendal/services/cloudflare_kv/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use super::DEFAULT_SCHEME;
24use crate::ErrorKind;
25use crate::raw::*;
26use crate::services::CloudflareKvConfig;
27use crate::services::cloudflare_kv::core::CloudflareKvCore;
28use crate::services::cloudflare_kv::delete::CloudflareKvDeleter;
29use crate::services::cloudflare_kv::error::parse_error;
30use crate::services::cloudflare_kv::lister::CloudflareKvLister;
31use crate::services::cloudflare_kv::model::*;
32use crate::services::cloudflare_kv::writer::CloudflareWriter;
33use crate::*;
34use bytes::Buf;
35use http::StatusCode;
36use jiff::Timestamp;
37
38impl Configurator for CloudflareKvConfig {
39 type Builder = CloudflareKvBuilder;
40 fn into_builder(self) -> Self::Builder {
41 CloudflareKvBuilder {
42 config: self,
43 http_client: None,
44 }
45 }
46}
47
48#[doc = include_str!("docs.md")]
49#[derive(Default)]
50pub struct CloudflareKvBuilder {
51 config: CloudflareKvConfig,
52
53 http_client: Option<HttpClient>,
55}
56
57impl Debug for CloudflareKvBuilder {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("CloudFlareKvBuilder")
60 .field("config", &self.config)
61 .finish()
62 }
63}
64
65impl CloudflareKvBuilder {
66 pub fn api_token(mut self, api_token: &str) -> Self {
68 if !api_token.is_empty() {
69 self.config.api_token = Some(api_token.to_string())
70 }
71 self
72 }
73
74 pub fn account_id(mut self, account_id: &str) -> Self {
76 if !account_id.is_empty() {
77 self.config.account_id = Some(account_id.to_string())
78 }
79 self
80 }
81
82 pub fn namespace_id(mut self, namespace_id: &str) -> Self {
84 if !namespace_id.is_empty() {
85 self.config.namespace_id = Some(namespace_id.to_string())
86 }
87 self
88 }
89
90 pub fn default_ttl(mut self, ttl: Duration) -> Self {
94 self.config.default_ttl = Some(ttl);
95 self
96 }
97
98 pub fn root(mut self, root: &str) -> Self {
100 self.config.root = if root.is_empty() {
101 None
102 } else {
103 Some(root.to_string())
104 };
105
106 self
107 }
108}
109
110impl Builder for CloudflareKvBuilder {
111 type Config = CloudflareKvConfig;
112
113 fn build(self) -> Result<impl Access> {
114 let api_token = match &self.config.api_token {
115 Some(api_token) => format_authorization_by_bearer(api_token)?,
116 None => {
117 return Err(Error::new(
118 ErrorKind::ConfigInvalid,
119 "api_token is required",
120 ));
121 }
122 };
123
124 let Some(account_id) = self.config.account_id.clone() else {
125 return Err(Error::new(
126 ErrorKind::ConfigInvalid,
127 "account_id is required",
128 ));
129 };
130
131 let Some(namespace_id) = self.config.namespace_id.clone() else {
132 return Err(Error::new(
133 ErrorKind::ConfigInvalid,
134 "namespace_id is required",
135 ));
136 };
137
138 if let Some(ttl) = self.config.default_ttl {
140 if ttl < Duration::from_secs(60) {
141 return Err(Error::new(
142 ErrorKind::ConfigInvalid,
143 "Default TTL must be at least 60 seconds",
144 ));
145 }
146 }
147
148 let root = normalize_root(
149 self.config
150 .root
151 .clone()
152 .unwrap_or_else(|| "/".to_string())
153 .as_str(),
154 );
155
156 Ok(CloudflareKvAccessor {
157 core: Arc::new(CloudflareKvCore {
158 api_token,
159 account_id,
160 namespace_id,
161 expiration_ttl: self.config.default_ttl,
162 info: {
163 let am = AccessorInfo::default();
164 am.set_scheme(DEFAULT_SCHEME)
165 .set_root(&root)
166 .set_native_capability(Capability {
167 create_dir: true,
168
169 stat: true,
170 stat_with_if_match: true,
171 stat_with_if_none_match: true,
172 stat_with_if_modified_since: true,
173 stat_with_if_unmodified_since: true,
174
175 read: true,
176 read_with_if_match: true,
177 read_with_if_none_match: true,
178 read_with_if_modified_since: true,
179 read_with_if_unmodified_since: true,
180
181 write: true,
182 write_can_empty: true,
183 write_total_max_size: Some(25 * 1024 * 1024),
184
185 list: true,
186 list_with_limit: true,
187 list_with_recursive: true,
188
189 delete: true,
190 delete_max_size: Some(10000),
191
192 shared: false,
193
194 ..Default::default()
195 });
196
197 #[allow(deprecated)]
199 if let Some(client) = self.http_client {
200 am.update_http_client(|_| client);
201 }
202
203 am.into()
204 },
205 }),
206 })
207 }
208}
209
210#[derive(Debug, Clone)]
211pub struct CloudflareKvAccessor {
212 core: std::sync::Arc<CloudflareKvCore>,
213}
214
215impl Access for CloudflareKvAccessor {
216 type Reader = Buffer;
217 type Writer = oio::OneShotWriter<CloudflareWriter>;
218 type Lister = oio::PageLister<CloudflareKvLister>;
219 type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
220
221 fn info(&self) -> std::sync::Arc<AccessorInfo> {
222 self.core.info.clone()
223 }
224
225 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
226 let path = build_abs_path(&self.core.info.root(), path);
227
228 if path == build_abs_path(&self.core.info.root(), "") {
229 return Ok(RpCreateDir::default());
230 }
231
232 let segments: Vec<&str> = path
234 .trim_start_matches('/')
235 .trim_end_matches('/')
236 .split('/')
237 .collect();
238
239 let mut current_path = String::from("/");
241 for segment in segments {
242 if !current_path.ends_with('/') {
244 current_path.push('/');
245 }
246 current_path.push_str(segment);
247 current_path.push('/');
248
249 let cf_kv_metadata = CfKvMetadata {
251 etag: build_tmp_path_of(¤t_path),
252 last_modified: Timestamp::now().to_string(),
253 content_length: 0,
254 is_dir: true,
255 };
256
257 self.core
259 .set(¤t_path, Buffer::new(), cf_kv_metadata)
260 .await?;
261 }
262
263 Ok(RpCreateDir::default())
264 }
265
266 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
267 let path = build_abs_path(&self.core.info.root(), path);
268 let new_path = path.trim_end_matches('/');
269
270 let resp = self.core.metadata(new_path).await?;
271
272 if resp.status() != StatusCode::OK {
274 if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
276 let list_resp = self.core.list(&path, None, None).await?;
278
279 if list_resp.status() == StatusCode::OK {
280 let list_body = list_resp.into_body();
281 let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
282 .map_err(new_json_deserialize_error)?;
283
284 if let Some(entries) = list_result.result {
286 if !entries.is_empty() {
287 return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
288 }
289 }
290
291 return Err(Error::new(
293 ErrorKind::NotFound,
294 "key not found in CloudFlare KV",
295 ));
296 }
297 }
298
299 return Err(parse_error(resp));
301 }
302
303 let resp_body = resp.into_body();
304 let cf_response: CfKvStatResponse =
305 serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
306
307 if !cf_response.success {
308 return Err(Error::new(
309 ErrorKind::Unexpected,
310 "cloudflare_kv stat this key failed for reason we don't know",
311 ));
312 }
313
314 let metadata = match cf_response.result {
315 Some(metadata) => {
316 if path.ends_with('/') && !metadata.is_dir {
317 return Err(Error::new(
318 ErrorKind::NotFound,
319 "key not found in CloudFlare KV",
320 ));
321 } else {
322 metadata
323 }
324 }
325 None => {
326 return Err(Error::new(
327 ErrorKind::NotFound,
328 "key not found in CloudFlare KV",
329 ));
330 }
331 };
332
333 if let Some(if_match) = &args.if_match() {
335 if if_match != &metadata.etag {
336 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
337 }
338 }
339
340 if let Some(if_none_match) = &args.if_none_match() {
342 if if_none_match == &metadata.etag {
343 return Err(Error::new(
344 ErrorKind::ConditionNotMatch,
345 "etag match when expected none match",
346 ));
347 }
348 }
349
350 let last_modified = metadata
352 .last_modified
353 .parse::<Timestamp>()
354 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
355
356 if let Some(modified_since) = &args.if_modified_since() {
358 if !last_modified.gt(modified_since) {
359 return Err(Error::new(
360 ErrorKind::ConditionNotMatch,
361 "not modified since specified time",
362 ));
363 }
364 }
365
366 if let Some(unmodified_since) = &args.if_unmodified_since() {
368 if !last_modified.le(unmodified_since) {
369 return Err(Error::new(
370 ErrorKind::ConditionNotMatch,
371 "modified since specified time",
372 ));
373 }
374 }
375
376 let meta = Metadata::new(if metadata.is_dir {
377 EntryMode::DIR
378 } else {
379 EntryMode::FILE
380 })
381 .with_etag(metadata.etag)
382 .with_content_length(metadata.content_length as u64)
383 .with_last_modified(parse_datetime_from_rfc3339(&metadata.last_modified)?);
384
385 Ok(RpStat::new(meta))
386 }
387
388 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
389 let path = build_abs_path(&self.core.info.root(), path);
390 let resp = self.core.get(&path).await?;
391
392 let status = resp.status();
393
394 if status != StatusCode::OK {
395 return Err(parse_error(resp));
396 }
397
398 let resp_body = resp.into_body();
399
400 if args.if_match().is_some()
401 || args.if_none_match().is_some()
402 || args.if_modified_since().is_some()
403 || args.if_unmodified_since().is_some()
404 {
405 let meta_resp = self.core.metadata(&path).await?;
406
407 if meta_resp.status() != StatusCode::OK {
408 return Err(parse_error(meta_resp));
409 }
410
411 let cf_response: CfKvStatResponse =
412 serde_json::from_reader(meta_resp.into_body().reader())
413 .map_err(new_json_deserialize_error)?;
414
415 if !cf_response.success && cf_response.result.is_some() {
416 return Err(Error::new(
417 ErrorKind::Unexpected,
418 "cloudflare_kv read this key failed for reason we don't know",
419 ));
420 }
421
422 let metadata = cf_response.result.unwrap();
423
424 if let Some(if_match) = &args.if_match() {
426 if if_match != &metadata.etag {
427 return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
428 }
429 }
430
431 if let Some(if_none_match) = &args.if_none_match() {
433 if if_none_match == &metadata.etag {
434 return Err(Error::new(
435 ErrorKind::ConditionNotMatch,
436 "etag match when expected none match",
437 ));
438 }
439 }
440
441 let last_modified = metadata
443 .last_modified
444 .parse::<Timestamp>()
445 .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
446
447 if let Some(modified_since) = &args.if_modified_since() {
449 if !last_modified.gt(modified_since) {
450 return Err(Error::new(
451 ErrorKind::ConditionNotMatch,
452 "not modified since specified time",
453 ));
454 }
455 }
456
457 if let Some(unmodified_since) = &args.if_unmodified_since() {
459 if !last_modified.le(unmodified_since) {
460 return Err(Error::new(
461 ErrorKind::ConditionNotMatch,
462 "modified since specified time",
463 ));
464 }
465 }
466 }
467
468 let range = args.range();
469 let buffer = if range.is_full() {
470 resp_body
471 } else {
472 let start = range.offset() as usize;
473 let end = match range.size() {
474 Some(size) => (range.offset() + size) as usize,
475 None => resp_body.len(),
476 };
477 resp_body.slice(start..end.min(resp_body.len()))
478 };
479 Ok((RpRead::new(), buffer))
480 }
481
482 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
483 let path = build_abs_path(&self.core.info.root(), path);
484 let writer = CloudflareWriter::new(self.core.clone(), path);
485
486 let w = oio::OneShotWriter::new(writer);
487
488 Ok((RpWrite::default(), w))
489 }
490
491 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
492 Ok((
493 RpDelete::default(),
494 oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
495 ))
496 }
497
498 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
499 let path = build_abs_path(&self.core.info.root(), path);
500
501 let limit = match args.limit() {
502 Some(limit) => {
503 if !(10..=1000).contains(&limit) {
505 1000
506 } else {
507 limit
508 }
509 }
510 None => 1000,
511 };
512
513 let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
514
515 Ok((RpList::default(), oio::PageLister::new(l)))
516 }
517}