opendal/services/mongodb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use mongodb::bson::doc;
22use mongodb::bson::Binary;
23use mongodb::bson::Document;
24use mongodb::options::ClientOptions;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::*;
29use crate::services::MongodbConfig;
30use crate::*;
31
32impl Configurator for MongodbConfig {
33 type Builder = MongodbBuilder;
34 fn into_builder(self) -> Self::Builder {
35 MongodbBuilder { config: self }
36 }
37}
38
39#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct MongodbBuilder {
42 config: MongodbConfig,
43}
44
45impl Debug for MongodbBuilder {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("MongodbBuilder")
48 .field("config", &self.config)
49 .finish()
50 }
51}
52
53impl MongodbBuilder {
54 pub fn connection_string(mut self, v: &str) -> Self {
75 if !v.is_empty() {
76 self.config.connection_string = Some(v.to_string());
77 }
78 self
79 }
80 pub fn root(mut self, root: &str) -> Self {
84 self.config.root = if root.is_empty() {
85 None
86 } else {
87 Some(root.to_string())
88 };
89
90 self
91 }
92
93 pub fn database(mut self, database: &str) -> Self {
95 if !database.is_empty() {
96 self.config.database = Some(database.to_string());
97 }
98 self
99 }
100
101 pub fn collection(mut self, collection: &str) -> Self {
103 if !collection.is_empty() {
104 self.config.collection = Some(collection.to_string());
105 }
106 self
107 }
108
109 pub fn key_field(mut self, key_field: &str) -> Self {
113 if !key_field.is_empty() {
114 self.config.key_field = Some(key_field.to_string());
115 }
116 self
117 }
118
119 pub fn value_field(mut self, value_field: &str) -> Self {
123 if !value_field.is_empty() {
124 self.config.value_field = Some(value_field.to_string());
125 }
126 self
127 }
128}
129
130impl Builder for MongodbBuilder {
131 type Config = MongodbConfig;
132
133 fn build(self) -> Result<impl Access> {
134 let conn = match &self.config.connection_string.clone() {
135 Some(v) => v.clone(),
136 None => {
137 return Err(
138 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
139 .with_context("service", Scheme::Mongodb),
140 )
141 }
142 };
143 let database = match &self.config.database.clone() {
144 Some(v) => v.clone(),
145 None => {
146 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
147 .with_context("service", Scheme::Mongodb))
148 }
149 };
150 let collection = match &self.config.collection.clone() {
151 Some(v) => v.clone(),
152 None => {
153 return Err(
154 Error::new(ErrorKind::ConfigInvalid, "collection is required")
155 .with_context("service", Scheme::Mongodb),
156 )
157 }
158 };
159 let key_field = match &self.config.key_field.clone() {
160 Some(v) => v.clone(),
161 None => "key".to_string(),
162 };
163 let value_field = match &self.config.value_field.clone() {
164 Some(v) => v.clone(),
165 None => "value".to_string(),
166 };
167 let root = normalize_root(
168 self.config
169 .root
170 .clone()
171 .unwrap_or_else(|| "/".to_string())
172 .as_str(),
173 );
174 Ok(MongodbBackend::new(Adapter {
175 connection_string: conn,
176 database,
177 collection,
178 collection_instance: OnceCell::new(),
179 key_field,
180 value_field,
181 })
182 .with_normalized_root(root))
183 }
184}
185
186pub type MongodbBackend = kv::Backend<Adapter>;
187
188#[derive(Clone)]
189pub struct Adapter {
190 connection_string: String,
191 database: String,
192 collection: String,
193 collection_instance: OnceCell<mongodb::Collection<Document>>,
194 key_field: String,
195 value_field: String,
196}
197
198impl Debug for Adapter {
199 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("Adapter")
201 .field("connection_string", &self.connection_string)
202 .field("database", &self.database)
203 .field("collection", &self.collection)
204 .field("key_field", &self.key_field)
205 .field("value_field", &self.value_field)
206 .finish()
207 }
208}
209
210impl Adapter {
211 async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
212 self.collection_instance
213 .get_or_try_init(|| async {
214 let client_options = ClientOptions::parse(&self.connection_string)
215 .await
216 .map_err(parse_mongodb_error)?;
217 let client =
218 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
219 let database = client.database(&self.database);
220 let collection = database.collection(&self.collection);
221 Ok(collection)
222 })
223 .await
224 }
225}
226
227impl kv::Adapter for Adapter {
228 type Scanner = ();
229
230 fn info(&self) -> kv::Info {
231 kv::Info::new(
232 Scheme::Mongodb,
233 &format!("{}/{}", self.database, self.collection),
234 Capability {
235 read: true,
236 write: true,
237 shared: true,
238 ..Default::default()
239 },
240 )
241 }
242
243 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
244 let collection = self.get_collection().await?;
245 let filter = doc! {self.key_field.as_str():path};
246 let result = collection
247 .find_one(filter)
248 .await
249 .map_err(parse_mongodb_error)?;
250 match result {
251 Some(doc) => {
252 let value = doc
253 .get_binary_generic(&self.value_field)
254 .map_err(parse_bson_error)?;
255 Ok(Some(Buffer::from(value.to_vec())))
256 }
257 None => Ok(None),
258 }
259 }
260
261 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
262 let collection = self.get_collection().await?;
263 let filter = doc! { self.key_field.as_str(): path };
264 let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
265 collection
266 .update_one(filter, update)
267 .upsert(true)
268 .await
269 .map_err(parse_mongodb_error)?;
270
271 Ok(())
272 }
273
274 async fn delete(&self, path: &str) -> Result<()> {
275 let collection = self.get_collection().await?;
276 let filter = doc! {self.key_field.as_str():path};
277 collection
278 .delete_one(filter)
279 .await
280 .map_err(parse_mongodb_error)?;
281 Ok(())
282 }
283}
284
285fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
286 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
287}
288
289fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
290 Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
291}