opendal_core/services/mongodb/
core.rs1use std::fmt::Debug;
19
20use mea::once::OnceCell;
21use mongodb::bson::Binary;
22use mongodb::bson::Document;
23use mongodb::bson::doc;
24use mongodb::options::ClientOptions;
25
26use crate::*;
27
28#[derive(Clone)]
29pub struct MongodbCore {
30 pub connection_string: String,
31 pub database: String,
32 pub collection: String,
33 pub collection_instance: OnceCell<mongodb::Collection<Document>>,
34 pub key_field: String,
35 pub value_field: String,
36}
37
38impl Debug for MongodbCore {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("MongodbCore")
41 .field("database", &self.database)
42 .field("collection", &self.collection)
43 .field("key_field", &self.key_field)
44 .field("value_field", &self.value_field)
45 .finish_non_exhaustive()
46 }
47}
48
49impl MongodbCore {
50 async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
51 self.collection_instance
52 .get_or_try_init(|| async {
53 let client_options = ClientOptions::parse(&self.connection_string)
54 .await
55 .map_err(parse_mongodb_error)?;
56 let client =
57 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
58 let database = client.database(&self.database);
59 let collection = database.collection(&self.collection);
60 Ok(collection)
61 })
62 .await
63 }
64
65 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
66 let collection = self.get_collection().await?;
67 let filter = doc! {self.key_field.as_str():path};
68 let result = collection
69 .find_one(filter)
70 .await
71 .map_err(parse_mongodb_error)?;
72 match result {
73 Some(doc) => {
74 let value = doc
75 .get_binary_generic(&self.value_field)
76 .map_err(parse_bson_error)?;
77 Ok(Some(Buffer::from(value.to_vec())))
78 }
79 None => Ok(None),
80 }
81 }
82
83 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
84 let collection = self.get_collection().await?;
85 let filter = doc! { self.key_field.as_str(): path };
86 let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
87 collection
88 .update_one(filter, update)
89 .upsert(true)
90 .await
91 .map_err(parse_mongodb_error)?;
92
93 Ok(())
94 }
95
96 pub async fn delete(&self, path: &str) -> Result<()> {
97 let collection = self.get_collection().await?;
98 let filter = doc! {self.key_field.as_str():path};
99 collection
100 .delete_one(filter)
101 .await
102 .map_err(parse_mongodb_error)?;
103 Ok(())
104 }
105}
106
107fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
108 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
109}
110
111fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
112 Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
113}