opendal_core/services/cloudflare_kv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::StatusCode;
23
24use super::CLOUDFLARE_KV_SCHEME;
25use super::config::CloudflareKvConfig;
26use super::core::CloudflareKvCore;
27use super::deleter::CloudflareKvDeleter;
28use super::error::parse_error;
29use super::lister::CloudflareKvLister;
30use super::model::*;
31use super::writer::CloudflareWriter;
32use crate::raw::*;
33use crate::*;
34
35#[doc = include_str!("docs.md")]
36#[derive(Default)]
37pub struct CloudflareKvBuilder {
38    pub(super) config: CloudflareKvConfig,
39
40    /// The HTTP client used to communicate with CloudFlare.
41    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
42    pub(super) http_client: Option<HttpClient>,
43}
44
45impl Debug for CloudflareKvBuilder {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("CloudflareKvBuilder")
48            .field("config", &self.config)
49            .finish_non_exhaustive()
50    }
51}
52
53impl CloudflareKvBuilder {
54    /// Set the token used to authenticate with CloudFlare.
55    pub fn api_token(mut self, api_token: &str) -> Self {
56        if !api_token.is_empty() {
57            self.config.api_token = Some(api_token.to_string())
58        }
59        self
60    }
61
62    /// Set the account ID used to authenticate with CloudFlare.
63    pub fn account_id(mut self, account_id: &str) -> Self {
64        if !account_id.is_empty() {
65            self.config.account_id = Some(account_id.to_string())
66        }
67        self
68    }
69
70    /// Set the namespace ID.
71    pub fn namespace_id(mut self, namespace_id: &str) -> Self {
72        if !namespace_id.is_empty() {
73            self.config.namespace_id = Some(namespace_id.to_string())
74        }
75        self
76    }
77
78    /// Set the default ttl for cloudflare_kv services.
79    ///
80    /// If set, we will specify `EX` for write operations.
81    pub fn default_ttl(mut self, ttl: Duration) -> Self {
82        self.config.default_ttl = Some(ttl);
83        self
84    }
85
86    /// Set the root within this backend.
87    pub fn root(mut self, root: &str) -> Self {
88        self.config.root = if root.is_empty() {
89            None
90        } else {
91            Some(root.to_string())
92        };
93
94        self
95    }
96}
97
98impl Builder for CloudflareKvBuilder {
99    type Config = CloudflareKvConfig;
100
101    fn build(self) -> Result<impl Access> {
102        let api_token = match &self.config.api_token {
103            Some(api_token) => format_authorization_by_bearer(api_token)?,
104            None => {
105                return Err(Error::new(
106                    ErrorKind::ConfigInvalid,
107                    "api_token is required",
108                ));
109            }
110        };
111
112        let Some(account_id) = self.config.account_id.clone() else {
113            return Err(Error::new(
114                ErrorKind::ConfigInvalid,
115                "account_id is required",
116            ));
117        };
118
119        let Some(namespace_id) = self.config.namespace_id.clone() else {
120            return Err(Error::new(
121                ErrorKind::ConfigInvalid,
122                "namespace_id is required",
123            ));
124        };
125
126        // Validate default TTL is at least 60 seconds if specified
127        if let Some(ttl) = self.config.default_ttl {
128            if ttl < Duration::from_secs(60) {
129                return Err(Error::new(
130                    ErrorKind::ConfigInvalid,
131                    "Default TTL must be at least 60 seconds",
132                ));
133            }
134        }
135
136        let root = normalize_root(
137            self.config
138                .root
139                .clone()
140                .unwrap_or_else(|| "/".to_string())
141                .as_str(),
142        );
143
144        Ok(CloudflareKvBackend {
145            core: Arc::new(CloudflareKvCore {
146                api_token,
147                account_id,
148                namespace_id,
149                expiration_ttl: self.config.default_ttl,
150                info: {
151                    let am = AccessorInfo::default();
152                    am.set_scheme(CLOUDFLARE_KV_SCHEME)
153                        .set_root(&root)
154                        .set_native_capability(Capability {
155                            create_dir: true,
156
157                            stat: true,
158                            stat_with_if_match: true,
159                            stat_with_if_none_match: true,
160                            stat_with_if_modified_since: true,
161                            stat_with_if_unmodified_since: true,
162
163                            read: true,
164                            read_with_if_match: true,
165                            read_with_if_none_match: true,
166                            read_with_if_modified_since: true,
167                            read_with_if_unmodified_since: true,
168
169                            write: true,
170                            write_can_empty: true,
171                            write_total_max_size: Some(25 * 1024 * 1024),
172
173                            list: true,
174                            list_with_limit: true,
175                            list_with_recursive: true,
176
177                            delete: true,
178                            delete_max_size: Some(10000),
179
180                            shared: false,
181
182                            ..Default::default()
183                        });
184
185                    // allow deprecated api here for compatibility
186                    #[allow(deprecated)]
187                    if let Some(client) = self.http_client {
188                        am.update_http_client(|_| client);
189                    }
190
191                    am.into()
192                },
193            }),
194        })
195    }
196}
197
198#[derive(Debug, Clone)]
199pub struct CloudflareKvBackend {
200    core: Arc<CloudflareKvCore>,
201}
202
203impl Access for CloudflareKvBackend {
204    type Reader = Buffer;
205    type Writer = oio::OneShotWriter<CloudflareWriter>;
206    type Lister = oio::PageLister<CloudflareKvLister>;
207    type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
208
209    fn info(&self) -> Arc<AccessorInfo> {
210        self.core.info.clone()
211    }
212
213    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
214        let path = build_abs_path(&self.core.info.root(), path);
215
216        if path == build_abs_path(&self.core.info.root(), "") {
217            return Ok(RpCreateDir::default());
218        }
219
220        // Split path into segments and create directories for each level
221        let segments: Vec<&str> = path
222            .trim_start_matches('/')
223            .trim_end_matches('/')
224            .split('/')
225            .collect();
226
227        // Create each directory level
228        let mut current_path = String::from("/");
229        for segment in segments {
230            // Build the current directory path
231            if !current_path.ends_with('/') {
232                current_path.push('/');
233            }
234            current_path.push_str(segment);
235            current_path.push('/');
236
237            // Create metadata for current directory
238            let cf_kv_metadata = CfKvMetadata {
239                etag: build_tmp_path_of(&current_path),
240                last_modified: Timestamp::now().to_string(),
241                content_length: 0,
242                is_dir: true,
243            };
244
245            // Set the directory entry
246            self.core
247                .set(&current_path, Buffer::new(), cf_kv_metadata)
248                .await?;
249        }
250
251        Ok(RpCreateDir::default())
252    }
253
254    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
255        let path = build_abs_path(&self.core.info.root(), path);
256        let new_path = path.trim_end_matches('/');
257
258        let resp = self.core.metadata(new_path).await?;
259
260        // Handle non-OK response
261        if resp.status() != StatusCode::OK {
262            // Special handling for potential directory paths
263            if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
264                // Try listing the path to check if it's a directory
265                let list_resp = self.core.list(&path, None, None).await?;
266
267                if list_resp.status() == StatusCode::OK {
268                    let list_body = list_resp.into_body();
269                    let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
270                        .map_err(new_json_deserialize_error)?;
271
272                    // If listing returns results, treat as directory
273                    if let Some(entries) = list_result.result {
274                        if !entries.is_empty() {
275                            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
276                        }
277                    }
278
279                    // Empty or no results means not found
280                    return Err(Error::new(
281                        ErrorKind::NotFound,
282                        "key not found in CloudFlare KV",
283                    ));
284                }
285            }
286
287            // For all other error cases, parse the error response
288            return Err(parse_error(resp));
289        }
290
291        let resp_body = resp.into_body();
292        let cf_response: CfKvStatResponse =
293            serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
294
295        if !cf_response.success {
296            return Err(Error::new(
297                ErrorKind::Unexpected,
298                "cloudflare_kv stat this key failed for reason we don't know",
299            ));
300        }
301
302        let metadata = match cf_response.result {
303            Some(metadata) => {
304                if path.ends_with('/') && !metadata.is_dir {
305                    return Err(Error::new(
306                        ErrorKind::NotFound,
307                        "key not found in CloudFlare KV",
308                    ));
309                } else {
310                    metadata
311                }
312            }
313            None => {
314                return Err(Error::new(
315                    ErrorKind::NotFound,
316                    "key not found in CloudFlare KV",
317                ));
318            }
319        };
320
321        // Check if_match condition
322        if let Some(if_match) = &args.if_match() {
323            if if_match != &metadata.etag {
324                return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
325            }
326        }
327
328        // Check if_none_match condition
329        if let Some(if_none_match) = &args.if_none_match() {
330            if if_none_match == &metadata.etag {
331                return Err(Error::new(
332                    ErrorKind::ConditionNotMatch,
333                    "etag match when expected none match",
334                ));
335            }
336        }
337
338        // Parse since time once for both time-based conditions
339        let last_modified = metadata
340            .last_modified
341            .parse::<Timestamp>()
342            .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
343
344        // Check modified_since condition
345        if let Some(modified_since) = &args.if_modified_since() {
346            if !last_modified.gt(modified_since) {
347                return Err(Error::new(
348                    ErrorKind::ConditionNotMatch,
349                    "not modified since specified time",
350                ));
351            }
352        }
353
354        // Check unmodified_since condition
355        if let Some(unmodified_since) = &args.if_unmodified_since() {
356            if !last_modified.le(unmodified_since) {
357                return Err(Error::new(
358                    ErrorKind::ConditionNotMatch,
359                    "modified since specified time",
360                ));
361            }
362        }
363
364        let meta = Metadata::new(if metadata.is_dir {
365            EntryMode::DIR
366        } else {
367            EntryMode::FILE
368        })
369        .with_etag(metadata.etag)
370        .with_content_length(metadata.content_length as u64)
371        .with_last_modified(metadata.last_modified.parse::<Timestamp>()?);
372
373        Ok(RpStat::new(meta))
374    }
375
376    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
377        let path = build_abs_path(&self.core.info.root(), path);
378        let resp = self.core.get(&path).await?;
379
380        let status = resp.status();
381
382        if status != StatusCode::OK {
383            return Err(parse_error(resp));
384        }
385
386        let resp_body = resp.into_body();
387
388        if args.if_match().is_some()
389            || args.if_none_match().is_some()
390            || args.if_modified_since().is_some()
391            || args.if_unmodified_since().is_some()
392        {
393            let meta_resp = self.core.metadata(&path).await?;
394
395            if meta_resp.status() != StatusCode::OK {
396                return Err(parse_error(meta_resp));
397            }
398
399            let cf_response: CfKvStatResponse =
400                serde_json::from_reader(meta_resp.into_body().reader())
401                    .map_err(new_json_deserialize_error)?;
402
403            if !cf_response.success && cf_response.result.is_some() {
404                return Err(Error::new(
405                    ErrorKind::Unexpected,
406                    "cloudflare_kv read this key failed for reason we don't know",
407                ));
408            }
409
410            let metadata = cf_response.result.unwrap();
411
412            // Check if_match condition
413            if let Some(if_match) = &args.if_match() {
414                if if_match != &metadata.etag {
415                    return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
416                }
417            }
418
419            // Check if_none_match condition
420            if let Some(if_none_match) = &args.if_none_match() {
421                if if_none_match == &metadata.etag {
422                    return Err(Error::new(
423                        ErrorKind::ConditionNotMatch,
424                        "etag match when expected none match",
425                    ));
426                }
427            }
428
429            // Parse since time once for both time-based conditions
430            let last_modified = metadata
431                .last_modified
432                .parse::<Timestamp>()
433                .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
434
435            // Check modified_since condition
436            if let Some(modified_since) = &args.if_modified_since() {
437                if !last_modified.gt(modified_since) {
438                    return Err(Error::new(
439                        ErrorKind::ConditionNotMatch,
440                        "not modified since specified time",
441                    ));
442                }
443            }
444
445            // Check unmodified_since condition
446            if let Some(unmodified_since) = &args.if_unmodified_since() {
447                if !last_modified.le(unmodified_since) {
448                    return Err(Error::new(
449                        ErrorKind::ConditionNotMatch,
450                        "modified since specified time",
451                    ));
452                }
453            }
454        }
455
456        let range = args.range();
457        let buffer = if range.is_full() {
458            resp_body
459        } else {
460            let start = range.offset() as usize;
461            let end = match range.size() {
462                Some(size) => (range.offset() + size) as usize,
463                None => resp_body.len(),
464            };
465            resp_body.slice(start..end.min(resp_body.len()))
466        };
467        Ok((RpRead::new(), buffer))
468    }
469
470    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
471        let path = build_abs_path(&self.core.info.root(), path);
472        let writer = CloudflareWriter::new(self.core.clone(), path);
473
474        let w = oio::OneShotWriter::new(writer);
475
476        Ok((RpWrite::default(), w))
477    }
478
479    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
480        Ok((
481            RpDelete::default(),
482            oio::BatchDeleter::new(
483                CloudflareKvDeleter::new(self.core.clone()),
484                self.core.info.full_capability().delete_max_size,
485            ),
486        ))
487    }
488
489    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
490        let path = build_abs_path(&self.core.info.root(), path);
491
492        let limit = match args.limit() {
493            Some(limit) => {
494                // The list limit of cloudflare_kv is limited to 10..1000.
495                if !(10..=1000).contains(&limit) {
496                    1000
497                } else {
498                    limit
499                }
500            }
501            None => 1000,
502        };
503
504        let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
505
506        Ok((RpList::default(), oio::PageLister::new(l)))
507    }
508}