opendal/services/cloudflare_kv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use super::DEFAULT_SCHEME;
24use crate::ErrorKind;
25use crate::raw::*;
26use crate::services::CloudflareKvConfig;
27use crate::services::cloudflare_kv::core::CloudflareKvCore;
28use crate::services::cloudflare_kv::delete::CloudflareKvDeleter;
29use crate::services::cloudflare_kv::error::parse_error;
30use crate::services::cloudflare_kv::lister::CloudflareKvLister;
31use crate::services::cloudflare_kv::model::*;
32use crate::services::cloudflare_kv::writer::CloudflareWriter;
33use crate::*;
34use bytes::Buf;
35use http::StatusCode;
36use jiff::Timestamp;
37
38impl Configurator for CloudflareKvConfig {
39    type Builder = CloudflareKvBuilder;
40    fn into_builder(self) -> Self::Builder {
41        CloudflareKvBuilder {
42            config: self,
43            http_client: None,
44        }
45    }
46}
47
48#[doc = include_str!("docs.md")]
49#[derive(Default)]
50pub struct CloudflareKvBuilder {
51    config: CloudflareKvConfig,
52
53    /// The HTTP client used to communicate with CloudFlare.
54    http_client: Option<HttpClient>,
55}
56
57impl Debug for CloudflareKvBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("CloudFlareKvBuilder")
60            .field("config", &self.config)
61            .finish()
62    }
63}
64
65impl CloudflareKvBuilder {
66    /// Set the token used to authenticate with CloudFlare.
67    pub fn api_token(mut self, api_token: &str) -> Self {
68        if !api_token.is_empty() {
69            self.config.api_token = Some(api_token.to_string())
70        }
71        self
72    }
73
74    /// Set the account ID used to authenticate with CloudFlare.
75    pub fn account_id(mut self, account_id: &str) -> Self {
76        if !account_id.is_empty() {
77            self.config.account_id = Some(account_id.to_string())
78        }
79        self
80    }
81
82    /// Set the namespace ID.
83    pub fn namespace_id(mut self, namespace_id: &str) -> Self {
84        if !namespace_id.is_empty() {
85            self.config.namespace_id = Some(namespace_id.to_string())
86        }
87        self
88    }
89
90    /// Set the default ttl for cloudflare_kv services.
91    ///
92    /// If set, we will specify `EX` for write operations.
93    pub fn default_ttl(mut self, ttl: Duration) -> Self {
94        self.config.default_ttl = Some(ttl);
95        self
96    }
97
98    /// Set the root within this backend.
99    pub fn root(mut self, root: &str) -> Self {
100        self.config.root = if root.is_empty() {
101            None
102        } else {
103            Some(root.to_string())
104        };
105
106        self
107    }
108}
109
110impl Builder for CloudflareKvBuilder {
111    type Config = CloudflareKvConfig;
112
113    fn build(self) -> Result<impl Access> {
114        let api_token = match &self.config.api_token {
115            Some(api_token) => format_authorization_by_bearer(api_token)?,
116            None => {
117                return Err(Error::new(
118                    ErrorKind::ConfigInvalid,
119                    "api_token is required",
120                ));
121            }
122        };
123
124        let Some(account_id) = self.config.account_id.clone() else {
125            return Err(Error::new(
126                ErrorKind::ConfigInvalid,
127                "account_id is required",
128            ));
129        };
130
131        let Some(namespace_id) = self.config.namespace_id.clone() else {
132            return Err(Error::new(
133                ErrorKind::ConfigInvalid,
134                "namespace_id is required",
135            ));
136        };
137
138        // Validate default TTL is at least 60 seconds if specified
139        if let Some(ttl) = self.config.default_ttl {
140            if ttl < Duration::from_secs(60) {
141                return Err(Error::new(
142                    ErrorKind::ConfigInvalid,
143                    "Default TTL must be at least 60 seconds",
144                ));
145            }
146        }
147
148        let root = normalize_root(
149            self.config
150                .root
151                .clone()
152                .unwrap_or_else(|| "/".to_string())
153                .as_str(),
154        );
155
156        Ok(CloudflareKvAccessor {
157            core: Arc::new(CloudflareKvCore {
158                api_token,
159                account_id,
160                namespace_id,
161                expiration_ttl: self.config.default_ttl,
162                info: {
163                    let am = AccessorInfo::default();
164                    am.set_scheme(DEFAULT_SCHEME)
165                        .set_root(&root)
166                        .set_native_capability(Capability {
167                            create_dir: true,
168
169                            stat: true,
170                            stat_with_if_match: true,
171                            stat_with_if_none_match: true,
172                            stat_with_if_modified_since: true,
173                            stat_with_if_unmodified_since: true,
174
175                            read: true,
176                            read_with_if_match: true,
177                            read_with_if_none_match: true,
178                            read_with_if_modified_since: true,
179                            read_with_if_unmodified_since: true,
180
181                            write: true,
182                            write_can_empty: true,
183                            write_total_max_size: Some(25 * 1024 * 1024),
184
185                            list: true,
186                            list_with_limit: true,
187                            list_with_recursive: true,
188
189                            delete: true,
190                            delete_max_size: Some(10000),
191
192                            shared: false,
193
194                            ..Default::default()
195                        });
196
197                    // allow deprecated api here for compatibility
198                    #[allow(deprecated)]
199                    if let Some(client) = self.http_client {
200                        am.update_http_client(|_| client);
201                    }
202
203                    am.into()
204                },
205            }),
206        })
207    }
208}
209
210#[derive(Debug, Clone)]
211pub struct CloudflareKvAccessor {
212    core: std::sync::Arc<CloudflareKvCore>,
213}
214
215impl Access for CloudflareKvAccessor {
216    type Reader = Buffer;
217    type Writer = oio::OneShotWriter<CloudflareWriter>;
218    type Lister = oio::PageLister<CloudflareKvLister>;
219    type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
220
221    fn info(&self) -> std::sync::Arc<AccessorInfo> {
222        self.core.info.clone()
223    }
224
225    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
226        let path = build_abs_path(&self.core.info.root(), path);
227
228        if path == build_abs_path(&self.core.info.root(), "") {
229            return Ok(RpCreateDir::default());
230        }
231
232        // Split path into segments and create directories for each level
233        let segments: Vec<&str> = path
234            .trim_start_matches('/')
235            .trim_end_matches('/')
236            .split('/')
237            .collect();
238
239        // Create each directory level
240        let mut current_path = String::from("/");
241        for segment in segments {
242            // Build the current directory path
243            if !current_path.ends_with('/') {
244                current_path.push('/');
245            }
246            current_path.push_str(segment);
247            current_path.push('/');
248
249            // Create metadata for current directory
250            let cf_kv_metadata = CfKvMetadata {
251                etag: build_tmp_path_of(&current_path),
252                last_modified: Timestamp::now().to_string(),
253                content_length: 0,
254                is_dir: true,
255            };
256
257            // Set the directory entry
258            self.core
259                .set(&current_path, Buffer::new(), cf_kv_metadata)
260                .await?;
261        }
262
263        Ok(RpCreateDir::default())
264    }
265
266    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
267        let path = build_abs_path(&self.core.info.root(), path);
268        let new_path = path.trim_end_matches('/');
269
270        let resp = self.core.metadata(new_path).await?;
271
272        // Handle non-OK response
273        if resp.status() != StatusCode::OK {
274            // Special handling for potential directory paths
275            if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
276                // Try listing the path to check if it's a directory
277                let list_resp = self.core.list(&path, None, None).await?;
278
279                if list_resp.status() == StatusCode::OK {
280                    let list_body = list_resp.into_body();
281                    let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
282                        .map_err(new_json_deserialize_error)?;
283
284                    // If listing returns results, treat as directory
285                    if let Some(entries) = list_result.result {
286                        if !entries.is_empty() {
287                            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
288                        }
289                    }
290
291                    // Empty or no results means not found
292                    return Err(Error::new(
293                        ErrorKind::NotFound,
294                        "key not found in CloudFlare KV",
295                    ));
296                }
297            }
298
299            // For all other error cases, parse the error response
300            return Err(parse_error(resp));
301        }
302
303        let resp_body = resp.into_body();
304        let cf_response: CfKvStatResponse =
305            serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
306
307        if !cf_response.success {
308            return Err(Error::new(
309                ErrorKind::Unexpected,
310                "cloudflare_kv stat this key failed for reason we don't know",
311            ));
312        }
313
314        let metadata = match cf_response.result {
315            Some(metadata) => {
316                if path.ends_with('/') && !metadata.is_dir {
317                    return Err(Error::new(
318                        ErrorKind::NotFound,
319                        "key not found in CloudFlare KV",
320                    ));
321                } else {
322                    metadata
323                }
324            }
325            None => {
326                return Err(Error::new(
327                    ErrorKind::NotFound,
328                    "key not found in CloudFlare KV",
329                ));
330            }
331        };
332
333        // Check if_match condition
334        if let Some(if_match) = &args.if_match() {
335            if if_match != &metadata.etag {
336                return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
337            }
338        }
339
340        // Check if_none_match condition
341        if let Some(if_none_match) = &args.if_none_match() {
342            if if_none_match == &metadata.etag {
343                return Err(Error::new(
344                    ErrorKind::ConditionNotMatch,
345                    "etag match when expected none match",
346                ));
347            }
348        }
349
350        // Parse since time once for both time-based conditions
351        let last_modified = metadata
352            .last_modified
353            .parse::<Timestamp>()
354            .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
355
356        // Check modified_since condition
357        if let Some(modified_since) = &args.if_modified_since() {
358            if !last_modified.gt(modified_since) {
359                return Err(Error::new(
360                    ErrorKind::ConditionNotMatch,
361                    "not modified since specified time",
362                ));
363            }
364        }
365
366        // Check unmodified_since condition
367        if let Some(unmodified_since) = &args.if_unmodified_since() {
368            if !last_modified.le(unmodified_since) {
369                return Err(Error::new(
370                    ErrorKind::ConditionNotMatch,
371                    "modified since specified time",
372                ));
373            }
374        }
375
376        let meta = Metadata::new(if metadata.is_dir {
377            EntryMode::DIR
378        } else {
379            EntryMode::FILE
380        })
381        .with_etag(metadata.etag)
382        .with_content_length(metadata.content_length as u64)
383        .with_last_modified(parse_datetime_from_rfc3339(&metadata.last_modified)?);
384
385        Ok(RpStat::new(meta))
386    }
387
388    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
389        let path = build_abs_path(&self.core.info.root(), path);
390        let resp = self.core.get(&path).await?;
391
392        let status = resp.status();
393
394        if status != StatusCode::OK {
395            return Err(parse_error(resp));
396        }
397
398        let resp_body = resp.into_body();
399
400        if args.if_match().is_some()
401            || args.if_none_match().is_some()
402            || args.if_modified_since().is_some()
403            || args.if_unmodified_since().is_some()
404        {
405            let meta_resp = self.core.metadata(&path).await?;
406
407            if meta_resp.status() != StatusCode::OK {
408                return Err(parse_error(meta_resp));
409            }
410
411            let cf_response: CfKvStatResponse =
412                serde_json::from_reader(meta_resp.into_body().reader())
413                    .map_err(new_json_deserialize_error)?;
414
415            if !cf_response.success && cf_response.result.is_some() {
416                return Err(Error::new(
417                    ErrorKind::Unexpected,
418                    "cloudflare_kv read this key failed for reason we don't know",
419                ));
420            }
421
422            let metadata = cf_response.result.unwrap();
423
424            // Check if_match condition
425            if let Some(if_match) = &args.if_match() {
426                if if_match != &metadata.etag {
427                    return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
428                }
429            }
430
431            // Check if_none_match condition
432            if let Some(if_none_match) = &args.if_none_match() {
433                if if_none_match == &metadata.etag {
434                    return Err(Error::new(
435                        ErrorKind::ConditionNotMatch,
436                        "etag match when expected none match",
437                    ));
438                }
439            }
440
441            // Parse since time once for both time-based conditions
442            let last_modified = metadata
443                .last_modified
444                .parse::<Timestamp>()
445                .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
446
447            // Check modified_since condition
448            if let Some(modified_since) = &args.if_modified_since() {
449                if !last_modified.gt(modified_since) {
450                    return Err(Error::new(
451                        ErrorKind::ConditionNotMatch,
452                        "not modified since specified time",
453                    ));
454                }
455            }
456
457            // Check unmodified_since condition
458            if let Some(unmodified_since) = &args.if_unmodified_since() {
459                if !last_modified.le(unmodified_since) {
460                    return Err(Error::new(
461                        ErrorKind::ConditionNotMatch,
462                        "modified since specified time",
463                    ));
464                }
465            }
466        }
467
468        let range = args.range();
469        let buffer = if range.is_full() {
470            resp_body
471        } else {
472            let start = range.offset() as usize;
473            let end = match range.size() {
474                Some(size) => (range.offset() + size) as usize,
475                None => resp_body.len(),
476            };
477            resp_body.slice(start..end.min(resp_body.len()))
478        };
479        Ok((RpRead::new(), buffer))
480    }
481
482    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
483        let path = build_abs_path(&self.core.info.root(), path);
484        let writer = CloudflareWriter::new(self.core.clone(), path);
485
486        let w = oio::OneShotWriter::new(writer);
487
488        Ok((RpWrite::default(), w))
489    }
490
491    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
492        Ok((
493            RpDelete::default(),
494            oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
495        ))
496    }
497
498    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
499        let path = build_abs_path(&self.core.info.root(), path);
500
501        let limit = match args.limit() {
502            Some(limit) => {
503                // The list limit of cloudflare_kv is limited to 10..1000.
504                if !(10..=1000).contains(&limit) {
505                    1000
506                } else {
507                    limit
508                }
509            }
510            None => 1000,
511        };
512
513        let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
514
515        Ok((RpList::default(), oio::PageLister::new(l)))
516    }
517}