opendal/services/atomicserver/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use atomic_lib::agents::Agent;
22use atomic_lib::client::get_authentication_headers;
23use atomic_lib::commit::sign_message;
24use bytes::Buf;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_TYPE;
27use http::Request;
28use serde::Deserialize;
29use serde::Serialize;
30
31use crate::raw::adapters::kv;
32use crate::raw::*;
33use crate::services::AtomicserverConfig;
34use crate::*;
35
36impl Configurator for AtomicserverConfig {
37 type Builder = AtomicserverBuilder;
38 fn into_builder(self) -> Self::Builder {
39 AtomicserverBuilder { config: self }
40 }
41}
42
43#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct AtomicserverBuilder {
46 config: AtomicserverConfig,
47}
48
49impl Debug for AtomicserverBuilder {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("AtomicserverBuilder")
52 .field("config", &self.config)
53 .finish()
54 }
55}
56
57impl AtomicserverBuilder {
58 pub fn root(mut self, path: &str) -> Self {
60 self.config.root = Some(path.into());
61 self
62 }
63
64 pub fn endpoint(mut self, endpoint: &str) -> Self {
66 self.config.endpoint = Some(endpoint.into());
67 self
68 }
69
70 pub fn private_key(mut self, private_key: &str) -> Self {
72 self.config.private_key = Some(private_key.into());
73 self
74 }
75
76 pub fn public_key(mut self, public_key: &str) -> Self {
81 self.config.public_key = Some(public_key.into());
82 self
83 }
84
85 pub fn parent_resource_id(mut self, parent_resource_id: &str) -> Self {
87 self.config.parent_resource_id = Some(parent_resource_id.into());
88 self
89 }
90}
91
92impl Builder for AtomicserverBuilder {
93 const SCHEME: Scheme = Scheme::Atomicserver;
94 type Config = AtomicserverConfig;
95
96 fn build(self) -> Result<impl Access> {
97 let root = normalize_root(
98 self.config
99 .root
100 .clone()
101 .unwrap_or_else(|| "/".to_string())
102 .as_str(),
103 );
104
105 let endpoint = self.config.endpoint.clone().unwrap();
106 let parent_resource_id = self.config.parent_resource_id.clone().unwrap();
107
108 let agent = Agent {
109 private_key: self.config.private_key.clone(),
110 public_key: self.config.public_key.clone().unwrap(),
111 subject: format!(
112 "{}/agents/{}",
113 endpoint,
114 self.config.public_key.clone().unwrap()
115 ),
116 created_at: 1,
117 name: Some("agent".to_string()),
118 };
119
120 Ok(AtomicserverBackend::new(Adapter {
121 parent_resource_id,
122 endpoint,
123 agent,
124 client: HttpClient::new().map_err(|err| {
125 err.with_operation("Builder::build")
126 .with_context("service", Scheme::Atomicserver)
127 })?,
128 })
129 .with_normalized_root(root))
130 }
131}
132
133pub type AtomicserverBackend = kv::Backend<Adapter>;
135
136const FILENAME_PROPERTY: &str = "https://atomicdata.dev/properties/filename";
137
138#[derive(Debug, Serialize)]
139struct CommitStruct {
140 #[serde(rename = "https://atomicdata.dev/properties/createdAt")]
141 created_at: i64,
142 #[serde(rename = "https://atomicdata.dev/properties/destroy")]
143 destroy: bool,
144 #[serde(rename = "https://atomicdata.dev/properties/isA")]
145 is_a: Vec<String>,
146 #[serde(rename = "https://atomicdata.dev/properties/signer")]
147 signer: String,
148 #[serde(rename = "https://atomicdata.dev/properties/subject")]
149 subject: String,
150}
151
152#[derive(Debug, Serialize)]
153struct CommitStructSigned {
154 #[serde(rename = "https://atomicdata.dev/properties/createdAt")]
155 created_at: i64,
156 #[serde(rename = "https://atomicdata.dev/properties/destroy")]
157 destroy: bool,
158 #[serde(rename = "https://atomicdata.dev/properties/isA")]
159 is_a: Vec<String>,
160 #[serde(rename = "https://atomicdata.dev/properties/signature")]
161 signature: String,
162 #[serde(rename = "https://atomicdata.dev/properties/signer")]
163 signer: String,
164 #[serde(rename = "https://atomicdata.dev/properties/subject")]
165 subject: String,
166}
167
168#[derive(Debug, Deserialize)]
169struct FileStruct {
170 #[serde(rename = "@id")]
171 id: String,
172 #[serde(rename = "https://atomicdata.dev/properties/downloadURL")]
173 download_url: String,
174}
175
176#[derive(Debug, Deserialize)]
177struct QueryResultStruct {
178 #[serde(
179 rename = "https://atomicdata.dev/properties/endpoint/results",
180 default = "empty_vec"
181 )]
182 results: Vec<FileStruct>,
183}
184
185fn empty_vec() -> Vec<FileStruct> {
186 Vec::new()
187}
188
189#[derive(Clone)]
190pub struct Adapter {
191 parent_resource_id: String,
192 endpoint: String,
193 agent: Agent,
194 client: HttpClient,
195}
196
197impl Debug for Adapter {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 let mut ds = f.debug_struct("Adapter");
200 ds.finish()
201 }
202}
203
204impl Adapter {
205 fn sign(&self, url: &str, mut req: http::request::Builder) -> http::request::Builder {
206 let auth_headers = get_authentication_headers(url, &self.agent)
207 .map_err(|err| {
208 Error::new(
209 ErrorKind::Unexpected,
210 "Failed to get authentication headers",
211 )
212 .with_context("service", Scheme::Atomicserver)
213 .set_source(err)
214 })
215 .unwrap();
216
217 for (k, v) in &auth_headers {
218 req = req.header(k, v);
219 }
220
221 req
222 }
223}
224
225impl Adapter {
226 pub fn atomic_get_object_request(&self, path: &str) -> Result<Request<Buffer>> {
227 let path = normalize_path(path);
228 let path = path.as_str();
229
230 let filename_property_escaped = FILENAME_PROPERTY.replace(':', "\\:").replace('.', "\\.");
231 let url = format!(
232 "{}/search?filters={}:%22{}%22",
233 self.endpoint,
234 percent_encode_path(&filename_property_escaped),
235 percent_encode_path(path)
236 );
237
238 let mut req = Request::get(&url);
239 req = self.sign(&url, req);
240 req = req.header(http::header::ACCEPT, "application/ad+json");
241
242 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
243
244 Ok(req)
245 }
246
247 fn atomic_post_object_request(&self, path: &str, value: Buffer) -> Result<Request<Buffer>> {
248 let path = normalize_path(path);
249 let path = path.as_str();
250
251 let url = format!(
252 "{}/upload?parent={}",
253 self.endpoint,
254 percent_encode_path(&self.parent_resource_id)
255 );
256
257 let mut req = Request::post(&url);
258 req = self.sign(&url, req);
259
260 let datapart = FormDataPart::new("assets")
261 .header(
262 CONTENT_DISPOSITION,
263 format!("form-data; name=\"assets\"; filename=\"{}\"", path)
264 .parse()
265 .unwrap(),
266 )
267 .header(CONTENT_TYPE, "text/plain".parse().unwrap())
268 .content(value.to_vec());
269
270 let multipart = Multipart::new().part(datapart);
271 let req = multipart.apply(req)?;
272
273 Ok(req)
274 }
275
276 pub fn atomic_delete_object_request(&self, subject: &str) -> Result<Request<Buffer>> {
277 let url = format!("{}/commit", self.endpoint);
278
279 let timestamp = std::time::SystemTime::now()
280 .duration_since(std::time::UNIX_EPOCH)
281 .expect("You're a time traveler")
282 .as_millis() as i64;
283
284 let commit_to_sign = CommitStruct {
285 created_at: timestamp,
286 destroy: true,
287 is_a: ["https://atomicdata.dev/classes/Commit".to_string()].to_vec(),
288 signer: self.agent.subject.to_string(),
289 subject: subject.to_string().clone(),
290 };
291 let commit_sign_string =
292 serde_json::to_string(&commit_to_sign).map_err(new_json_serialize_error)?;
293
294 let signature = sign_message(
295 &commit_sign_string,
296 self.agent.private_key.as_ref().unwrap(),
297 &self.agent.public_key,
298 )
299 .unwrap();
300
301 let commit = CommitStructSigned {
302 created_at: timestamp,
303 destroy: true,
304 is_a: ["https://atomicdata.dev/classes/Commit".to_string()].to_vec(),
305 signature,
306 signer: self.agent.subject.to_string(),
307 subject: subject.to_string().clone(),
308 };
309
310 let req = Request::post(&url);
311 let body_string = serde_json::to_string(&commit).map_err(new_json_serialize_error)?;
312
313 let body_bytes = body_string.as_bytes().to_owned();
314 let req = req
315 .body(Buffer::from(body_bytes))
316 .map_err(new_request_build_error)?;
317
318 Ok(req)
319 }
320
321 pub async fn download_from_url(&self, download_url: &String) -> Result<Buffer> {
322 let req = Request::get(download_url);
323 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
324 let resp = self.client.send(req).await?;
325 Ok(resp.into_body())
326 }
327}
328
329impl Adapter {
330 async fn wait_for_resource(&self, path: &str, expect_exist: bool) -> Result<()> {
331 for _i in 0..1000 {
335 let req = self.atomic_get_object_request(path)?;
336 let resp = self.client.send(req).await?;
337 let bytes = resp.into_body();
338 let query_result: QueryResultStruct =
339 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
340 if !expect_exist && query_result.results.is_empty() {
341 break;
342 }
343 if expect_exist && !query_result.results.is_empty() {
344 break;
345 }
346 std::thread::sleep(std::time::Duration::from_millis(30));
347 }
348
349 Ok(())
350 }
351}
352
353impl kv::Adapter for Adapter {
354 type Scanner = ();
355
356 fn info(&self) -> kv::Info {
357 kv::Info::new(
358 Scheme::Atomicserver,
359 "atomicserver",
360 Capability {
361 read: true,
362 write: true,
363 delete: true,
364 shared: true,
365 ..Default::default()
366 },
367 )
368 }
369
370 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
371 let req = self.atomic_get_object_request(path)?;
372 let resp = self.client.send(req).await?;
373 let bytes = resp.into_body();
374
375 let query_result: QueryResultStruct =
376 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
377
378 if query_result.results.is_empty() {
379 return Err(Error::new(
380 ErrorKind::NotFound,
381 "atomicserver: key not found",
382 ));
383 }
384
385 let bytes_file = self
386 .download_from_url(&query_result.results[0].download_url)
387 .await?;
388
389 Ok(Some(bytes_file))
390 }
391
392 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
393 let req = self.atomic_get_object_request(path)?;
394 let res = self.client.send(req).await?;
395 let bytes = res.into_body();
396
397 let query_result: QueryResultStruct =
398 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
399
400 for result in query_result.results {
401 let req = self.atomic_delete_object_request(&result.id)?;
402 let _res = self.client.send(req).await?;
403 }
404
405 let _ = self.wait_for_resource(path, false).await;
406
407 let req = self.atomic_post_object_request(path, value)?;
408 let _res = self.client.send(req).await?;
409 let _ = self.wait_for_resource(path, true).await;
410
411 Ok(())
412 }
413
414 async fn delete(&self, path: &str) -> Result<()> {
415 let req = self.atomic_get_object_request(path)?;
416 let res = self.client.send(req).await?;
417 let bytes = res.into_body();
418
419 let query_result: QueryResultStruct =
420 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
421
422 for result in query_result.results {
423 let req = self.atomic_delete_object_request(&result.id)?;
424 let _res = self.client.send(req).await?;
425 }
426
427 let _ = self.wait_for_resource(path, false).await;
428
429 Ok(())
430 }
431}