opendal_core/services/cloudflare_kv/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use http::Request;
21use http::Response;
22use http::header;
23use http::request;
24use serde_json::json;
25
26use crate::raw::*;
27use crate::services::cloudflare_kv::model::CfKvMetadata;
28use crate::{Buffer, Result};
29
30#[derive(Debug, Clone)]
31pub struct CloudflareKvCore {
32    pub api_token: String,
33    pub account_id: String,
34    pub namespace_id: String,
35    pub expiration_ttl: Option<Duration>,
36    pub info: Arc<AccessorInfo>,
37}
38
39impl CloudflareKvCore {
40    #[inline]
41    async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
42        self.info.http_client().send(req).await
43    }
44
45    fn sign(&self, req: request::Builder) -> request::Builder {
46        req.header(header::AUTHORIZATION, &self.api_token)
47    }
48
49    fn url_prefix(&self) -> String {
50        let url = format!(
51            "https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}",
52            self.account_id, self.namespace_id
53        );
54        url
55    }
56}
57
58impl CloudflareKvCore {
59    pub async fn metadata(&self, path: &str) -> Result<Response<Buffer>> {
60        let url = format!(
61            "{}/metadata/{}",
62            self.url_prefix(),
63            percent_encode_path(path)
64        );
65
66        let req = Request::get(url);
67        let req = self.sign(req);
68
69        let req = req
70            .extension(Operation::Stat)
71            .body(Buffer::new())
72            .map_err(new_request_build_error)?;
73
74        self.send(req).await
75    }
76
77    pub async fn get(&self, path: &str) -> Result<Response<Buffer>> {
78        let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
79        let req = Request::get(url);
80
81        let req = self.sign(req);
82
83        let req = req
84            .extension(Operation::Read)
85            .body(Buffer::new())
86            .map_err(new_request_build_error)?;
87
88        self.send(req).await
89    }
90
91    pub async fn set(
92        &self,
93        path: &str,
94        value: Buffer,
95        metadata: CfKvMetadata,
96    ) -> Result<Response<Buffer>> {
97        let url = format!("{}/values/{}", self.url_prefix(), percent_encode_path(path));
98
99        let req = Request::put(url);
100        let req = self.sign(req);
101        let req = req.extension(Operation::Write);
102
103        let mut multipart = Multipart::new()
104            .part(FormDataPart::new("value").content(value))
105            .part(
106                FormDataPart::new("metadata")
107                    .content(serde_json::to_string(&metadata).map_err(new_json_serialize_error)?),
108            );
109
110        if let Some(expiration_ttl) = self.expiration_ttl {
111            multipart = multipart.part(
112                FormDataPart::new("expiration_ttl").content(expiration_ttl.as_secs().to_string()),
113            );
114        }
115
116        let req = multipart.apply(req)?;
117
118        self.send(req).await
119    }
120
121    pub async fn delete(&self, paths: &[String]) -> Result<Response<Buffer>> {
122        let url = format!("{}/bulk/delete", self.url_prefix());
123
124        let req = Request::post(&url);
125
126        let req = self.sign(req);
127        let req_body = &json!(paths);
128        let req = req
129            .extension(Operation::Delete)
130            .header(header::CONTENT_TYPE, "application/json")
131            .body(Buffer::from(req_body.to_string()))
132            .map_err(new_request_build_error)?;
133
134        self.send(req).await
135    }
136
137    pub async fn list(
138        &self,
139        prefix: &str,
140        limit: Option<usize>,
141        cursor: Option<String>,
142    ) -> Result<Response<Buffer>> {
143        let url = format!("{}/keys", self.url_prefix());
144        let mut url = QueryPairsWriter::new(&url);
145        if let Some(cursor) = cursor {
146            if !cursor.is_empty() {
147                url = url.push("cursor", &cursor);
148            }
149        }
150        url = url.push("limit", &limit.unwrap_or(1000).to_string());
151        url = url.push("prefix", &percent_encode_path(prefix));
152
153        let req = Request::get(url.finish());
154
155        let req = self.sign(req);
156        let req = req
157            .extension(Operation::List)
158            .body(Buffer::new())
159            .map_err(new_request_build_error)?;
160
161        self.send(req).await
162    }
163}