opendal/services/mongodb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use mongodb::bson::Binary;
22use mongodb::bson::Document;
23use mongodb::bson::doc;
24use mongodb::options::ClientOptions;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::*;
29use crate::services::MongodbConfig;
30use crate::*;
31
32#[doc = include_str!("docs.md")]
33#[derive(Default)]
34pub struct MongodbBuilder {
35 pub(super) config: MongodbConfig,
36}
37
38impl Debug for MongodbBuilder {
39 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("MongodbBuilder")
41 .field("config", &self.config)
42 .finish()
43 }
44}
45
46impl MongodbBuilder {
47 pub fn connection_string(mut self, v: &str) -> Self {
68 if !v.is_empty() {
69 self.config.connection_string = Some(v.to_string());
70 }
71 self
72 }
73 pub fn root(mut self, root: &str) -> Self {
77 self.config.root = if root.is_empty() {
78 None
79 } else {
80 Some(root.to_string())
81 };
82
83 self
84 }
85
86 pub fn database(mut self, database: &str) -> Self {
88 if !database.is_empty() {
89 self.config.database = Some(database.to_string());
90 }
91 self
92 }
93
94 pub fn collection(mut self, collection: &str) -> Self {
96 if !collection.is_empty() {
97 self.config.collection = Some(collection.to_string());
98 }
99 self
100 }
101
102 pub fn key_field(mut self, key_field: &str) -> Self {
106 if !key_field.is_empty() {
107 self.config.key_field = Some(key_field.to_string());
108 }
109 self
110 }
111
112 pub fn value_field(mut self, value_field: &str) -> Self {
116 if !value_field.is_empty() {
117 self.config.value_field = Some(value_field.to_string());
118 }
119 self
120 }
121}
122
123impl Builder for MongodbBuilder {
124 type Config = MongodbConfig;
125
126 fn build(self) -> Result<impl Access> {
127 let conn = match &self.config.connection_string.clone() {
128 Some(v) => v.clone(),
129 None => {
130 return Err(
131 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
132 .with_context("service", Scheme::Mongodb),
133 );
134 }
135 };
136 let database = match &self.config.database.clone() {
137 Some(v) => v.clone(),
138 None => {
139 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
140 .with_context("service", Scheme::Mongodb));
141 }
142 };
143 let collection = match &self.config.collection.clone() {
144 Some(v) => v.clone(),
145 None => {
146 return Err(
147 Error::new(ErrorKind::ConfigInvalid, "collection is required")
148 .with_context("service", Scheme::Mongodb),
149 );
150 }
151 };
152 let key_field = match &self.config.key_field.clone() {
153 Some(v) => v.clone(),
154 None => "key".to_string(),
155 };
156 let value_field = match &self.config.value_field.clone() {
157 Some(v) => v.clone(),
158 None => "value".to_string(),
159 };
160 let root = normalize_root(
161 self.config
162 .root
163 .clone()
164 .unwrap_or_else(|| "/".to_string())
165 .as_str(),
166 );
167 Ok(MongodbBackend::new(Adapter {
168 connection_string: conn,
169 database,
170 collection,
171 collection_instance: OnceCell::new(),
172 key_field,
173 value_field,
174 })
175 .with_normalized_root(root))
176 }
177}
178
179pub type MongodbBackend = kv::Backend<Adapter>;
180
181#[derive(Clone)]
182pub struct Adapter {
183 connection_string: String,
184 database: String,
185 collection: String,
186 collection_instance: OnceCell<mongodb::Collection<Document>>,
187 key_field: String,
188 value_field: String,
189}
190
191impl Debug for Adapter {
192 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193 f.debug_struct("Adapter")
194 .field("connection_string", &self.connection_string)
195 .field("database", &self.database)
196 .field("collection", &self.collection)
197 .field("key_field", &self.key_field)
198 .field("value_field", &self.value_field)
199 .finish()
200 }
201}
202
203impl Adapter {
204 async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
205 self.collection_instance
206 .get_or_try_init(|| async {
207 let client_options = ClientOptions::parse(&self.connection_string)
208 .await
209 .map_err(parse_mongodb_error)?;
210 let client =
211 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
212 let database = client.database(&self.database);
213 let collection = database.collection(&self.collection);
214 Ok(collection)
215 })
216 .await
217 }
218}
219
220impl kv::Adapter for Adapter {
221 type Scanner = ();
222
223 fn info(&self) -> kv::Info {
224 kv::Info::new(
225 Scheme::Mongodb,
226 &format!("{}/{}", self.database, self.collection),
227 Capability {
228 read: true,
229 write: true,
230 shared: true,
231 ..Default::default()
232 },
233 )
234 }
235
236 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
237 let collection = self.get_collection().await?;
238 let filter = doc! {self.key_field.as_str():path};
239 let result = collection
240 .find_one(filter)
241 .await
242 .map_err(parse_mongodb_error)?;
243 match result {
244 Some(doc) => {
245 let value = doc
246 .get_binary_generic(&self.value_field)
247 .map_err(parse_bson_error)?;
248 Ok(Some(Buffer::from(value.to_vec())))
249 }
250 None => Ok(None),
251 }
252 }
253
254 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
255 let collection = self.get_collection().await?;
256 let filter = doc! { self.key_field.as_str(): path };
257 let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
258 collection
259 .update_one(filter, update)
260 .upsert(true)
261 .await
262 .map_err(parse_mongodb_error)?;
263
264 Ok(())
265 }
266
267 async fn delete(&self, path: &str) -> Result<()> {
268 let collection = self.get_collection().await?;
269 let filter = doc! {self.key_field.as_str():path};
270 collection
271 .delete_one(filter)
272 .await
273 .map_err(parse_mongodb_error)?;
274 Ok(())
275 }
276}
277
278fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
279 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
280}
281
282fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
283 Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
284}