opendal/services/cloudflare_kv/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use http::StatusCode;
25
26use super::DEFAULT_SCHEME;
27use crate::raw::*;
28use crate::services::cloudflare_kv::core::CloudflareKvCore;
29use crate::services::cloudflare_kv::delete::CloudflareKvDeleter;
30use crate::services::cloudflare_kv::error::parse_error;
31use crate::services::cloudflare_kv::lister::CloudflareKvLister;
32use crate::services::cloudflare_kv::model::*;
33use crate::services::cloudflare_kv::writer::CloudflareWriter;
34use crate::services::CloudflareKvConfig;
35use crate::ErrorKind;
36use crate::*;
37
38impl Configurator for CloudflareKvConfig {
39    type Builder = CloudflareKvBuilder;
40    fn into_builder(self) -> Self::Builder {
41        CloudflareKvBuilder {
42            config: self,
43            http_client: None,
44        }
45    }
46}
47
48#[doc = include_str!("docs.md")]
49#[derive(Default)]
50pub struct CloudflareKvBuilder {
51    config: CloudflareKvConfig,
52
53    /// The HTTP client used to communicate with CloudFlare.
54    http_client: Option<HttpClient>,
55}
56
57impl Debug for CloudflareKvBuilder {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("CloudFlareKvBuilder")
60            .field("config", &self.config)
61            .finish()
62    }
63}
64
65impl CloudflareKvBuilder {
66    /// Set the token used to authenticate with CloudFlare.
67    pub fn api_token(mut self, api_token: &str) -> Self {
68        if !api_token.is_empty() {
69            self.config.api_token = Some(api_token.to_string())
70        }
71        self
72    }
73
74    /// Set the account ID used to authenticate with CloudFlare.
75    pub fn account_id(mut self, account_id: &str) -> Self {
76        if !account_id.is_empty() {
77            self.config.account_id = Some(account_id.to_string())
78        }
79        self
80    }
81
82    /// Set the namespace ID.
83    pub fn namespace_id(mut self, namespace_id: &str) -> Self {
84        if !namespace_id.is_empty() {
85            self.config.namespace_id = Some(namespace_id.to_string())
86        }
87        self
88    }
89
90    /// Set the default ttl for cloudflare_kv services.
91    ///
92    /// If set, we will specify `EX` for write operations.
93    pub fn default_ttl(mut self, ttl: Duration) -> Self {
94        self.config.default_ttl = Some(ttl);
95        self
96    }
97
98    /// Set the root within this backend.
99    pub fn root(mut self, root: &str) -> Self {
100        self.config.root = if root.is_empty() {
101            None
102        } else {
103            Some(root.to_string())
104        };
105
106        self
107    }
108}
109
110impl Builder for CloudflareKvBuilder {
111    type Config = CloudflareKvConfig;
112
113    fn build(self) -> Result<impl Access> {
114        let api_token = match &self.config.api_token {
115            Some(api_token) => format_authorization_by_bearer(api_token)?,
116            None => {
117                return Err(Error::new(
118                    ErrorKind::ConfigInvalid,
119                    "api_token is required",
120                ))
121            }
122        };
123
124        let Some(account_id) = self.config.account_id.clone() else {
125            return Err(Error::new(
126                ErrorKind::ConfigInvalid,
127                "account_id is required",
128            ));
129        };
130
131        let Some(namespace_id) = self.config.namespace_id.clone() else {
132            return Err(Error::new(
133                ErrorKind::ConfigInvalid,
134                "namespace_id is required",
135            ));
136        };
137
138        // Validate default TTL is at least 60 seconds if specified
139        if let Some(ttl) = self.config.default_ttl {
140            if ttl < Duration::from_secs(60) {
141                return Err(Error::new(
142                    ErrorKind::ConfigInvalid,
143                    "Default TTL must be at least 60 seconds",
144                ));
145            }
146        }
147
148        let root = normalize_root(
149            self.config
150                .root
151                .clone()
152                .unwrap_or_else(|| "/".to_string())
153                .as_str(),
154        );
155
156        Ok(CloudflareKvAccessor {
157            core: Arc::new(CloudflareKvCore {
158                api_token,
159                account_id,
160                namespace_id,
161                expiration_ttl: self.config.default_ttl,
162                info: {
163                    let am = AccessorInfo::default();
164                    am.set_scheme(DEFAULT_SCHEME)
165                        .set_root(&root)
166                        .set_native_capability(Capability {
167                            create_dir: true,
168
169                            stat: true,
170                            stat_with_if_match: true,
171                            stat_with_if_none_match: true,
172                            stat_with_if_modified_since: true,
173                            stat_with_if_unmodified_since: true,
174
175                            read: true,
176                            read_with_if_match: true,
177                            read_with_if_none_match: true,
178                            read_with_if_modified_since: true,
179                            read_with_if_unmodified_since: true,
180
181                            write: true,
182                            write_can_empty: true,
183                            write_total_max_size: Some(25 * 1024 * 1024),
184
185                            list: true,
186                            list_with_limit: true,
187                            list_with_recursive: true,
188
189                            delete: true,
190                            delete_max_size: Some(10000),
191
192                            shared: false,
193
194                            ..Default::default()
195                        });
196
197                    // allow deprecated api here for compatibility
198                    #[allow(deprecated)]
199                    if let Some(client) = self.http_client {
200                        am.update_http_client(|_| client);
201                    }
202
203                    am.into()
204                },
205            }),
206        })
207    }
208}
209
210#[derive(Debug, Clone)]
211pub struct CloudflareKvAccessor {
212    core: std::sync::Arc<CloudflareKvCore>,
213}
214
215impl Access for CloudflareKvAccessor {
216    type Reader = Buffer;
217    type Writer = oio::OneShotWriter<CloudflareWriter>;
218    type Lister = oio::PageLister<CloudflareKvLister>;
219    type Deleter = oio::BatchDeleter<CloudflareKvDeleter>;
220
221    fn info(&self) -> std::sync::Arc<AccessorInfo> {
222        self.core.info.clone()
223    }
224
225    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
226        let path = build_abs_path(&self.core.info.root(), path);
227
228        if path == build_abs_path(&self.core.info.root(), "") {
229            return Ok(RpCreateDir::default());
230        }
231
232        // Split path into segments and create directories for each level
233        let segments: Vec<&str> = path
234            .trim_start_matches('/')
235            .trim_end_matches('/')
236            .split('/')
237            .collect();
238
239        // Create each directory level
240        let mut current_path = String::from("/");
241        for segment in segments {
242            // Build the current directory path
243            if !current_path.ends_with('/') {
244                current_path.push('/');
245            }
246            current_path.push_str(segment);
247            current_path.push('/');
248
249            // Create metadata for current directory
250            let cf_kv_metadata = CfKvMetadata {
251                etag: build_tmp_path_of(&current_path),
252                last_modified: chrono::Local::now().to_rfc3339(),
253                content_length: 0,
254                is_dir: true,
255            };
256
257            // Set the directory entry
258            self.core
259                .set(&current_path, Buffer::new(), cf_kv_metadata)
260                .await?;
261        }
262
263        Ok(RpCreateDir::default())
264    }
265
266    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
267        let path = build_abs_path(&self.core.info.root(), path);
268        let new_path = path.trim_end_matches('/');
269
270        let resp = self.core.metadata(new_path).await?;
271
272        // Handle non-OK response
273        if resp.status() != StatusCode::OK {
274            // Special handling for potential directory paths
275            if path.ends_with('/') && resp.status() == StatusCode::NOT_FOUND {
276                // Try listing the path to check if it's a directory
277                let list_resp = self.core.list(&path, None, None).await?;
278
279                if list_resp.status() == StatusCode::OK {
280                    let list_body = list_resp.into_body();
281                    let list_result: CfKvListResponse = serde_json::from_reader(list_body.reader())
282                        .map_err(new_json_deserialize_error)?;
283
284                    // If listing returns results, treat as directory
285                    if let Some(entries) = list_result.result {
286                        if !entries.is_empty() {
287                            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
288                        }
289                    }
290
291                    // Empty or no results means not found
292                    return Err(Error::new(
293                        ErrorKind::NotFound,
294                        "key not found in CloudFlare KV",
295                    ));
296                }
297            }
298
299            // For all other error cases, parse the error response
300            return Err(parse_error(resp));
301        }
302
303        let resp_body = resp.into_body();
304        let cf_response: CfKvStatResponse =
305            serde_json::from_reader(resp_body.reader()).map_err(new_json_deserialize_error)?;
306
307        if !cf_response.success {
308            return Err(Error::new(
309                ErrorKind::Unexpected,
310                "cloudflare_kv stat this key failed for reason we don't know",
311            ));
312        }
313
314        let metadata = match cf_response.result {
315            Some(metadata) => {
316                if path.ends_with('/') && !metadata.is_dir {
317                    return Err(Error::new(
318                        ErrorKind::NotFound,
319                        "key not found in CloudFlare KV",
320                    ));
321                } else {
322                    metadata
323                }
324            }
325            None => {
326                return Err(Error::new(
327                    ErrorKind::NotFound,
328                    "key not found in CloudFlare KV",
329                ));
330            }
331        };
332
333        // Check if_match condition
334        if let Some(if_match) = &args.if_match() {
335            if if_match != &metadata.etag {
336                return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
337            }
338        }
339
340        // Check if_none_match condition
341        if let Some(if_none_match) = &args.if_none_match() {
342            if if_none_match == &metadata.etag {
343                return Err(Error::new(
344                    ErrorKind::ConditionNotMatch,
345                    "etag match when expected none match",
346                ));
347            }
348        }
349
350        // Parse since time once for both time-based conditions
351        let last_modified = chrono::DateTime::parse_from_rfc3339(&metadata.last_modified)
352            .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
353
354        // Check modified_since condition
355        if let Some(modified_since) = &args.if_modified_since() {
356            if !last_modified.gt(modified_since) {
357                return Err(Error::new(
358                    ErrorKind::ConditionNotMatch,
359                    "not modified since specified time",
360                ));
361            }
362        }
363
364        // Check unmodified_since condition
365        if let Some(unmodified_since) = &args.if_unmodified_since() {
366            if !last_modified.le(unmodified_since) {
367                return Err(Error::new(
368                    ErrorKind::ConditionNotMatch,
369                    "modified since specified time",
370                ));
371            }
372        }
373
374        let meta = Metadata::new(if metadata.is_dir {
375            EntryMode::DIR
376        } else {
377            EntryMode::FILE
378        })
379        .with_etag(metadata.etag)
380        .with_content_length(metadata.content_length as u64)
381        .with_last_modified(parse_datetime_from_rfc3339(&metadata.last_modified)?);
382
383        Ok(RpStat::new(meta))
384    }
385
386    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
387        let path = build_abs_path(&self.core.info.root(), path);
388        let resp = self.core.get(&path).await?;
389
390        let status = resp.status();
391
392        if status != StatusCode::OK {
393            return Err(parse_error(resp));
394        }
395
396        let resp_body = resp.into_body();
397
398        if args.if_match().is_some()
399            || args.if_none_match().is_some()
400            || args.if_modified_since().is_some()
401            || args.if_unmodified_since().is_some()
402        {
403            let meta_resp = self.core.metadata(&path).await?;
404
405            if meta_resp.status() != StatusCode::OK {
406                return Err(parse_error(meta_resp));
407            }
408
409            let cf_response: CfKvStatResponse =
410                serde_json::from_reader(meta_resp.into_body().reader())
411                    .map_err(new_json_deserialize_error)?;
412
413            if !cf_response.success && cf_response.result.is_some() {
414                return Err(Error::new(
415                    ErrorKind::Unexpected,
416                    "cloudflare_kv read this key failed for reason we don't know",
417                ));
418            }
419
420            let metadata = cf_response.result.unwrap();
421
422            // Check if_match condition
423            if let Some(if_match) = &args.if_match() {
424                if if_match != &metadata.etag {
425                    return Err(Error::new(ErrorKind::ConditionNotMatch, "etag mismatch"));
426                }
427            }
428
429            // Check if_none_match condition
430            if let Some(if_none_match) = &args.if_none_match() {
431                if if_none_match == &metadata.etag {
432                    return Err(Error::new(
433                        ErrorKind::ConditionNotMatch,
434                        "etag match when expected none match",
435                    ));
436                }
437            }
438
439            // Parse since time once for both time-based conditions
440            let last_modified = chrono::DateTime::parse_from_rfc3339(&metadata.last_modified)
441                .map_err(|_| Error::new(ErrorKind::Unsupported, "invalid since format"))?;
442
443            // Check modified_since condition
444            if let Some(modified_since) = &args.if_modified_since() {
445                if !last_modified.gt(modified_since) {
446                    return Err(Error::new(
447                        ErrorKind::ConditionNotMatch,
448                        "not modified since specified time",
449                    ));
450                }
451            }
452
453            // Check unmodified_since condition
454            if let Some(unmodified_since) = &args.if_unmodified_since() {
455                if !last_modified.le(unmodified_since) {
456                    return Err(Error::new(
457                        ErrorKind::ConditionNotMatch,
458                        "modified since specified time",
459                    ));
460                }
461            }
462        }
463
464        let range = args.range();
465        let buffer = if range.is_full() {
466            resp_body
467        } else {
468            let start = range.offset() as usize;
469            let end = match range.size() {
470                Some(size) => (range.offset() + size) as usize,
471                None => resp_body.len(),
472            };
473            resp_body.slice(start..end.min(resp_body.len()))
474        };
475        Ok((RpRead::new(), buffer))
476    }
477
478    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
479        let path = build_abs_path(&self.core.info.root(), path);
480        let writer = CloudflareWriter::new(self.core.clone(), path);
481
482        let w = oio::OneShotWriter::new(writer);
483
484        Ok((RpWrite::default(), w))
485    }
486
487    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
488        Ok((
489            RpDelete::default(),
490            oio::BatchDeleter::new(CloudflareKvDeleter::new(self.core.clone())),
491        ))
492    }
493
494    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
495        let path = build_abs_path(&self.core.info.root(), path);
496
497        let limit = match args.limit() {
498            Some(limit) => {
499                // The list limit of cloudflare_kv is limited to 10..1000.
500                if !(10..=1000).contains(&limit) {
501                    1000
502                } else {
503                    limit
504                }
505            }
506            None => 1000,
507        };
508
509        let l = CloudflareKvLister::new(self.core.clone(), &path, args.recursive(), Some(limit));
510
511        Ok((RpList::default(), oio::PageLister::new(l)))
512    }
513}