opendal/services/cloudflare_kv/
delete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::BatchDeleteResult;
26use crate::raw::*;
27use crate::services::cloudflare_kv::model::CfKvDeleteResponse;
28use crate::*;
29
30pub struct CloudflareKvDeleter {
31    core: Arc<CloudflareKvCore>,
32}
33
34impl CloudflareKvDeleter {
35    pub fn new(core: Arc<CloudflareKvCore>) -> Self {
36        Self { core }
37    }
38}
39
40impl oio::BatchDelete for CloudflareKvDeleter {
41    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
42        let path = build_abs_path(&self.core.info.root(), &path);
43        let resp = self
44            .core
45            .delete(&[path.trim_end_matches('/').to_string()])
46            .await?;
47
48        let status = resp.status();
49
50        if status != StatusCode::OK {
51            return Err(parse_error(resp.clone()));
52        }
53
54        let bs = resp.clone().into_body();
55        let res: CfKvDeleteResponse =
56            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
57
58        if !res.success {
59            return Err(parse_error(resp.clone()));
60        }
61
62        Ok(())
63    }
64
65    async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
66        let keys = batch
67            .iter()
68            .map(|path| {
69                let path = build_abs_path(&self.core.info.root(), &path.0);
70                path.trim_end_matches('/').to_string()
71            })
72            .collect::<Vec<String>>();
73
74        let resp = self.core.delete(&keys).await?;
75
76        let status = resp.status();
77
78        if status != StatusCode::OK {
79            return Err(parse_error(resp));
80        }
81
82        let bs = resp.into_body();
83
84        let res: CfKvDeleteResponse =
85            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
86
87        let result = match (res.success, res.result) {
88            (true, Some(result)) => result,
89            _ => {
90                return Err(Error::new(
91                    ErrorKind::Unexpected,
92                    "cloudflare_kv delete this key failed for reason we don't know",
93                ))
94            }
95        };
96
97        let mut batched_result = BatchDeleteResult {
98            succeeded: Vec::with_capacity(result.successful_key_count),
99            failed: Vec::with_capacity(result.unsuccessful_keys.len()),
100        };
101
102        for item in batch {
103            if result.unsuccessful_keys.contains(&item.0) {
104                batched_result.failed.push((
105                    item.0,
106                    item.1,
107                    Error::new(
108                        ErrorKind::Unexpected,
109                        "cloudflare_kv delete this key failed for reason we don't know",
110                    ),
111                ));
112            } else {
113                batched_result.succeeded.push(item);
114            }
115        }
116
117        Ok(batched_result)
118    }
119}