opendal/services/atomicserver/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use atomic_lib::agents::Agent;
22use atomic_lib::client::get_authentication_headers;
23use atomic_lib::commit::sign_message;
24use bytes::Buf;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_TYPE;
27use http::Request;
28use serde::Deserialize;
29use serde::Serialize;
30
31use crate::raw::adapters::kv;
32use crate::raw::*;
33use crate::services::AtomicserverConfig;
34use crate::*;
35
36impl Configurator for AtomicserverConfig {
37    type Builder = AtomicserverBuilder;
38    fn into_builder(self) -> Self::Builder {
39        AtomicserverBuilder { config: self }
40    }
41}
42
43#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct AtomicserverBuilder {
46    config: AtomicserverConfig,
47}
48
49impl Debug for AtomicserverBuilder {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("AtomicserverBuilder")
52            .field("config", &self.config)
53            .finish()
54    }
55}
56
57impl AtomicserverBuilder {
58    /// Set the root for Atomicserver.
59    pub fn root(mut self, path: &str) -> Self {
60        self.config.root = Some(path.into());
61        self
62    }
63
64    /// Set the server address for Atomicserver.
65    pub fn endpoint(mut self, endpoint: &str) -> Self {
66        self.config.endpoint = Some(endpoint.into());
67        self
68    }
69
70    /// Set the private key for agent used for Atomicserver.
71    pub fn private_key(mut self, private_key: &str) -> Self {
72        self.config.private_key = Some(private_key.into());
73        self
74    }
75
76    /// Set the public key for agent used for Atomicserver.
77    /// For example, if the subject URL for the agent being used
78    /// is ${endpoint}/agents/lTB+W3C/2YfDu9IAVleEy34uCmb56iXXuzWCKBVwdRI=
79    /// Then the required public key is `lTB+W3C/2YfDu9IAVleEy34uCmb56iXXuzWCKBVwdRI=`
80    pub fn public_key(mut self, public_key: &str) -> Self {
81        self.config.public_key = Some(public_key.into());
82        self
83    }
84
85    /// Set the parent resource id (url) that Atomicserver uses to store resources under.
86    pub fn parent_resource_id(mut self, parent_resource_id: &str) -> Self {
87        self.config.parent_resource_id = Some(parent_resource_id.into());
88        self
89    }
90}
91
92impl Builder for AtomicserverBuilder {
93    const SCHEME: Scheme = Scheme::Atomicserver;
94    type Config = AtomicserverConfig;
95
96    fn build(self) -> Result<impl Access> {
97        let root = normalize_root(
98            self.config
99                .root
100                .clone()
101                .unwrap_or_else(|| "/".to_string())
102                .as_str(),
103        );
104
105        let endpoint = self.config.endpoint.clone().unwrap();
106        let parent_resource_id = self.config.parent_resource_id.clone().unwrap();
107
108        let agent = Agent {
109            private_key: self.config.private_key.clone(),
110            public_key: self.config.public_key.clone().unwrap(),
111            subject: format!(
112                "{}/agents/{}",
113                endpoint,
114                self.config.public_key.clone().unwrap()
115            ),
116            created_at: 1,
117            name: Some("agent".to_string()),
118        };
119
120        Ok(AtomicserverBackend::new(Adapter {
121            parent_resource_id,
122            endpoint,
123            agent,
124            client: HttpClient::new().map_err(|err| {
125                err.with_operation("Builder::build")
126                    .with_context("service", Scheme::Atomicserver)
127            })?,
128        })
129        .with_normalized_root(root))
130    }
131}
132
133/// Backend for Atomicserver services.
134pub type AtomicserverBackend = kv::Backend<Adapter>;
135
136const FILENAME_PROPERTY: &str = "https://atomicdata.dev/properties/filename";
137
138#[derive(Debug, Serialize)]
139struct CommitStruct {
140    #[serde(rename = "https://atomicdata.dev/properties/createdAt")]
141    created_at: i64,
142    #[serde(rename = "https://atomicdata.dev/properties/destroy")]
143    destroy: bool,
144    #[serde(rename = "https://atomicdata.dev/properties/isA")]
145    is_a: Vec<String>,
146    #[serde(rename = "https://atomicdata.dev/properties/signer")]
147    signer: String,
148    #[serde(rename = "https://atomicdata.dev/properties/subject")]
149    subject: String,
150}
151
152#[derive(Debug, Serialize)]
153struct CommitStructSigned {
154    #[serde(rename = "https://atomicdata.dev/properties/createdAt")]
155    created_at: i64,
156    #[serde(rename = "https://atomicdata.dev/properties/destroy")]
157    destroy: bool,
158    #[serde(rename = "https://atomicdata.dev/properties/isA")]
159    is_a: Vec<String>,
160    #[serde(rename = "https://atomicdata.dev/properties/signature")]
161    signature: String,
162    #[serde(rename = "https://atomicdata.dev/properties/signer")]
163    signer: String,
164    #[serde(rename = "https://atomicdata.dev/properties/subject")]
165    subject: String,
166}
167
168#[derive(Debug, Deserialize)]
169struct FileStruct {
170    #[serde(rename = "@id")]
171    id: String,
172    #[serde(rename = "https://atomicdata.dev/properties/downloadURL")]
173    download_url: String,
174}
175
176#[derive(Debug, Deserialize)]
177struct QueryResultStruct {
178    #[serde(
179        rename = "https://atomicdata.dev/properties/endpoint/results",
180        default = "empty_vec"
181    )]
182    results: Vec<FileStruct>,
183}
184
185fn empty_vec() -> Vec<FileStruct> {
186    Vec::new()
187}
188
189#[derive(Clone)]
190pub struct Adapter {
191    parent_resource_id: String,
192    endpoint: String,
193    agent: Agent,
194    client: HttpClient,
195}
196
197impl Debug for Adapter {
198    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199        let mut ds = f.debug_struct("Adapter");
200        ds.finish()
201    }
202}
203
204impl Adapter {
205    fn sign(&self, url: &str, mut req: http::request::Builder) -> http::request::Builder {
206        let auth_headers = get_authentication_headers(url, &self.agent)
207            .map_err(|err| {
208                Error::new(
209                    ErrorKind::Unexpected,
210                    "Failed to get authentication headers",
211                )
212                .with_context("service", Scheme::Atomicserver)
213                .set_source(err)
214            })
215            .unwrap();
216
217        for (k, v) in &auth_headers {
218            req = req.header(k, v);
219        }
220
221        req
222    }
223}
224
225impl Adapter {
226    pub fn atomic_get_object_request(&self, path: &str) -> Result<Request<Buffer>> {
227        let path = normalize_path(path);
228        let path = path.as_str();
229
230        let filename_property_escaped = FILENAME_PROPERTY.replace(':', "\\:").replace('.', "\\.");
231        let url = format!(
232            "{}/search?filters={}:%22{}%22",
233            self.endpoint,
234            percent_encode_path(&filename_property_escaped),
235            percent_encode_path(path)
236        );
237
238        let mut req = Request::get(&url);
239        req = self.sign(&url, req);
240        req = req.header(http::header::ACCEPT, "application/ad+json");
241
242        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
243
244        Ok(req)
245    }
246
247    fn atomic_post_object_request(&self, path: &str, value: Buffer) -> Result<Request<Buffer>> {
248        let path = normalize_path(path);
249        let path = path.as_str();
250
251        let url = format!(
252            "{}/upload?parent={}",
253            self.endpoint,
254            percent_encode_path(&self.parent_resource_id)
255        );
256
257        let mut req = Request::post(&url);
258        req = self.sign(&url, req);
259
260        let datapart = FormDataPart::new("assets")
261            .header(
262                CONTENT_DISPOSITION,
263                format!("form-data; name=\"assets\"; filename=\"{}\"", path)
264                    .parse()
265                    .unwrap(),
266            )
267            .header(CONTENT_TYPE, "text/plain".parse().unwrap())
268            .content(value.to_vec());
269
270        let multipart = Multipart::new().part(datapart);
271        let req = multipart.apply(req)?;
272
273        Ok(req)
274    }
275
276    pub fn atomic_delete_object_request(&self, subject: &str) -> Result<Request<Buffer>> {
277        let url = format!("{}/commit", self.endpoint);
278
279        let timestamp = std::time::SystemTime::now()
280            .duration_since(std::time::UNIX_EPOCH)
281            .expect("You're a time traveler")
282            .as_millis() as i64;
283
284        let commit_to_sign = CommitStruct {
285            created_at: timestamp,
286            destroy: true,
287            is_a: ["https://atomicdata.dev/classes/Commit".to_string()].to_vec(),
288            signer: self.agent.subject.to_string(),
289            subject: subject.to_string().clone(),
290        };
291        let commit_sign_string =
292            serde_json::to_string(&commit_to_sign).map_err(new_json_serialize_error)?;
293
294        let signature = sign_message(
295            &commit_sign_string,
296            self.agent.private_key.as_ref().unwrap(),
297            &self.agent.public_key,
298        )
299        .unwrap();
300
301        let commit = CommitStructSigned {
302            created_at: timestamp,
303            destroy: true,
304            is_a: ["https://atomicdata.dev/classes/Commit".to_string()].to_vec(),
305            signature,
306            signer: self.agent.subject.to_string(),
307            subject: subject.to_string().clone(),
308        };
309
310        let req = Request::post(&url);
311        let body_string = serde_json::to_string(&commit).map_err(new_json_serialize_error)?;
312
313        let body_bytes = body_string.as_bytes().to_owned();
314        let req = req
315            .body(Buffer::from(body_bytes))
316            .map_err(new_request_build_error)?;
317
318        Ok(req)
319    }
320
321    pub async fn download_from_url(&self, download_url: &String) -> Result<Buffer> {
322        let req = Request::get(download_url);
323        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
324        let resp = self.client.send(req).await?;
325        Ok(resp.into_body())
326    }
327}
328
329impl Adapter {
330    async fn wait_for_resource(&self, path: &str, expect_exist: bool) -> Result<()> {
331        // This is used to wait until insert/delete is actually effective
332        // This wait function is needed because atomicserver commits are not processed in real-time
333        // See https://docs.atomicdata.dev/commits/intro.html#motivation
334        for _i in 0..1000 {
335            let req = self.atomic_get_object_request(path)?;
336            let resp = self.client.send(req).await?;
337            let bytes = resp.into_body();
338            let query_result: QueryResultStruct =
339                serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
340            if !expect_exist && query_result.results.is_empty() {
341                break;
342            }
343            if expect_exist && !query_result.results.is_empty() {
344                break;
345            }
346            std::thread::sleep(std::time::Duration::from_millis(30));
347        }
348
349        Ok(())
350    }
351}
352
353impl kv::Adapter for Adapter {
354    type Scanner = ();
355
356    fn info(&self) -> kv::Info {
357        kv::Info::new(
358            Scheme::Atomicserver,
359            "atomicserver",
360            Capability {
361                read: true,
362                write: true,
363                delete: true,
364                shared: true,
365                ..Default::default()
366            },
367        )
368    }
369
370    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
371        let req = self.atomic_get_object_request(path)?;
372        let resp = self.client.send(req).await?;
373        let bytes = resp.into_body();
374
375        let query_result: QueryResultStruct =
376            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
377
378        if query_result.results.is_empty() {
379            return Err(Error::new(
380                ErrorKind::NotFound,
381                "atomicserver: key not found",
382            ));
383        }
384
385        let bytes_file = self
386            .download_from_url(&query_result.results[0].download_url)
387            .await?;
388
389        Ok(Some(bytes_file))
390    }
391
392    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
393        let req = self.atomic_get_object_request(path)?;
394        let res = self.client.send(req).await?;
395        let bytes = res.into_body();
396
397        let query_result: QueryResultStruct =
398            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
399
400        for result in query_result.results {
401            let req = self.atomic_delete_object_request(&result.id)?;
402            let _res = self.client.send(req).await?;
403        }
404
405        let _ = self.wait_for_resource(path, false).await;
406
407        let req = self.atomic_post_object_request(path, value)?;
408        let _res = self.client.send(req).await?;
409        let _ = self.wait_for_resource(path, true).await;
410
411        Ok(())
412    }
413
414    async fn delete(&self, path: &str) -> Result<()> {
415        let req = self.atomic_get_object_request(path)?;
416        let res = self.client.send(req).await?;
417        let bytes = res.into_body();
418
419        let query_result: QueryResultStruct =
420            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
421
422        for result in query_result.results {
423            let req = self.atomic_delete_object_request(&result.id)?;
424            let _res = self.client.send(req).await?;
425        }
426
427        let _ = self.wait_for_resource(path, false).await;
428
429        Ok(())
430    }
431}