opendal/services/cloudflare_kv/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use http::header;
22use http::request;
23use http::Request;
24use http::Response;
25use serde_json::json;
26
27use crate::raw::new_json_serialize_error;
28use crate::raw::percent_encode_path;
29use crate::raw::Operation;
30use crate::raw::QueryPairsWriter;
31use crate::raw::{new_request_build_error, AccessorInfo, FormDataPart, Multipart};
32use crate::services::cloudflare_kv::model::CfKvMetadata;
33use crate::{Buffer, Result};
34
35#[derive(Debug, Clone)]
36pub struct CloudflareKvCore {
37    pub api_token: String,
38    pub account_id: String,
39    pub namespace_id: String,
40    pub expiration_ttl: Option<Duration>,
41    pub info: Arc<AccessorInfo>,
42}
43
44impl CloudflareKvCore {
45    #[inline]
46    async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
47        self.info.http_client().send(req).await
48    }
49
50    fn sign(&self, req: request::Builder) -> request::Builder {
51        req.header(header::AUTHORIZATION, &self.api_token)
52    }
53
54    fn url_prefix(&self) -> String {
55        let url = format!(
56            "https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}",
57            self.account_id, self.namespace_id
58        );
59        url
60    }
61}
62
63impl CloudflareKvCore {
64    pub async fn metadata(&self, path: &str) -> Result<Response<Buffer>> {
65        let url = format!(
66            "{}/metadata/{}",
67            self.url_prefix(),
68            percent_encode_path(path)
69        );
70
71        let req = Request::get(url);
72        let req = self.sign(req);
73
74        let req = req
75            .extension(Operation::Stat)
76            .body(Buffer::new())
77            .map_err(new_request_build_error)?;
78
79        self.send(req).await
80    }
81
82    pub async fn get(&self, path: &str) -> Result<Response<Buffer>> {
83        let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
84        let req = Request::get(url);
85
86        let req = self.sign(req);
87
88        let req = req
89            .extension(Operation::Read)
90            .body(Buffer::new())
91            .map_err(new_request_build_error)?;
92
93        self.send(req).await
94    }
95
96    pub async fn set(
97        &self,
98        path: &str,
99        value: Buffer,
100        metadata: CfKvMetadata,
101    ) -> Result<Response<Buffer>> {
102        let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
103
104        let req = Request::put(url);
105        let req = self.sign(req);
106        let req = req.extension(Operation::Write);
107
108        let mut multipart = Multipart::new()
109            .part(FormDataPart::new("value").content(value))
110            .part(
111                FormDataPart::new("metadata")
112                    .content(serde_json::to_string(&metadata).map_err(new_json_serialize_error)?),
113            );
114
115        if let Some(expiration_ttl) = self.expiration_ttl {
116            multipart = multipart.part(
117                FormDataPart::new("expiration_ttl").content(expiration_ttl.as_secs().to_string()),
118            );
119        }
120
121        let req = multipart.apply(req)?;
122
123        self.send(req).await
124    }
125
126    pub async fn delete(&self, paths: &[String]) -> Result<Response<Buffer>> {
127        let url = format!("{}/bulk/delete", self.url_prefix());
128
129        let req = Request::post(&url);
130
131        let req = self.sign(req);
132        let req_body = &json!(paths);
133        let req = req
134            .extension(Operation::Delete)
135            .header(header::CONTENT_TYPE, "application/json")
136            .body(Buffer::from(req_body.to_string()))
137            .map_err(new_request_build_error)?;
138
139        self.send(req).await
140    }
141
142    pub async fn list(
143        &self,
144        prefix: &str,
145        limit: Option<usize>,
146        cursor: Option<String>,
147    ) -> Result<Response<Buffer>> {
148        let url = format!("{}/keys", self.url_prefix());
149        let mut url = QueryPairsWriter::new(&url);
150        if let Some(cursor) = cursor {
151            if !cursor.is_empty() {
152                url = url.push("cursor", &cursor);
153            }
154        }
155        url = url.push("limit", &limit.unwrap_or(1000).to_string());
156        url = url.push("prefix", &percent_encode_path(prefix));
157
158        let req = Request::get(url.finish());
159
160        let req = self.sign(req);
161        let req = req
162            .extension(Operation::List)
163            .body(Buffer::new())
164            .map_err(new_request_build_error)?;
165
166        self.send(req).await
167    }
168}