opendal/services/mongodb/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use mongodb::bson::doc;
22use mongodb::bson::Binary;
23use mongodb::bson::Document;
24use mongodb::options::ClientOptions;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::*;
29use crate::services::MongodbConfig;
30use crate::*;
31
32impl Configurator for MongodbConfig {
33 type Builder = MongodbBuilder;
34 fn into_builder(self) -> Self::Builder {
35 MongodbBuilder { config: self }
36 }
37}
38
39#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct MongodbBuilder {
42 config: MongodbConfig,
43}
44
45impl Debug for MongodbBuilder {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("MongodbBuilder")
48 .field("config", &self.config)
49 .finish()
50 }
51}
52
53impl MongodbBuilder {
54 pub fn connection_string(mut self, v: &str) -> Self {
75 if !v.is_empty() {
76 self.config.connection_string = Some(v.to_string());
77 }
78 self
79 }
80 pub fn root(mut self, root: &str) -> Self {
84 self.config.root = if root.is_empty() {
85 None
86 } else {
87 Some(root.to_string())
88 };
89
90 self
91 }
92
93 pub fn database(mut self, database: &str) -> Self {
95 if !database.is_empty() {
96 self.config.database = Some(database.to_string());
97 }
98 self
99 }
100
101 pub fn collection(mut self, collection: &str) -> Self {
103 if !collection.is_empty() {
104 self.config.collection = Some(collection.to_string());
105 }
106 self
107 }
108
109 pub fn key_field(mut self, key_field: &str) -> Self {
113 if !key_field.is_empty() {
114 self.config.key_field = Some(key_field.to_string());
115 }
116 self
117 }
118
119 pub fn value_field(mut self, value_field: &str) -> Self {
123 if !value_field.is_empty() {
124 self.config.value_field = Some(value_field.to_string());
125 }
126 self
127 }
128}
129
130impl Builder for MongodbBuilder {
131 const SCHEME: Scheme = Scheme::Mongodb;
132 type Config = MongodbConfig;
133
134 fn build(self) -> Result<impl Access> {
135 let conn = match &self.config.connection_string.clone() {
136 Some(v) => v.clone(),
137 None => {
138 return Err(
139 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
140 .with_context("service", Scheme::Mongodb),
141 )
142 }
143 };
144 let database = match &self.config.database.clone() {
145 Some(v) => v.clone(),
146 None => {
147 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
148 .with_context("service", Scheme::Mongodb))
149 }
150 };
151 let collection = match &self.config.collection.clone() {
152 Some(v) => v.clone(),
153 None => {
154 return Err(
155 Error::new(ErrorKind::ConfigInvalid, "collection is required")
156 .with_context("service", Scheme::Mongodb),
157 )
158 }
159 };
160 let key_field = match &self.config.key_field.clone() {
161 Some(v) => v.clone(),
162 None => "key".to_string(),
163 };
164 let value_field = match &self.config.value_field.clone() {
165 Some(v) => v.clone(),
166 None => "value".to_string(),
167 };
168 let root = normalize_root(
169 self.config
170 .root
171 .clone()
172 .unwrap_or_else(|| "/".to_string())
173 .as_str(),
174 );
175 Ok(MongodbBackend::new(Adapter {
176 connection_string: conn,
177 database,
178 collection,
179 collection_instance: OnceCell::new(),
180 key_field,
181 value_field,
182 })
183 .with_normalized_root(root))
184 }
185}
186
187pub type MongodbBackend = kv::Backend<Adapter>;
188
189#[derive(Clone)]
190pub struct Adapter {
191 connection_string: String,
192 database: String,
193 collection: String,
194 collection_instance: OnceCell<mongodb::Collection<Document>>,
195 key_field: String,
196 value_field: String,
197}
198
199impl Debug for Adapter {
200 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
201 f.debug_struct("Adapter")
202 .field("connection_string", &self.connection_string)
203 .field("database", &self.database)
204 .field("collection", &self.collection)
205 .field("key_field", &self.key_field)
206 .field("value_field", &self.value_field)
207 .finish()
208 }
209}
210
211impl Adapter {
212 async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
213 self.collection_instance
214 .get_or_try_init(|| async {
215 let client_options = ClientOptions::parse(&self.connection_string)
216 .await
217 .map_err(parse_mongodb_error)?;
218 let client =
219 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
220 let database = client.database(&self.database);
221 let collection = database.collection(&self.collection);
222 Ok(collection)
223 })
224 .await
225 }
226}
227
228impl kv::Adapter for Adapter {
229 type Scanner = ();
230
231 fn info(&self) -> kv::Info {
232 kv::Info::new(
233 Scheme::Mongodb,
234 &format!("{}/{}", self.database, self.collection),
235 Capability {
236 read: true,
237 write: true,
238 shared: true,
239 ..Default::default()
240 },
241 )
242 }
243
244 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
245 let collection = self.get_collection().await?;
246 let filter = doc! {self.key_field.as_str():path};
247 let result = collection
248 .find_one(filter)
249 .await
250 .map_err(parse_mongodb_error)?;
251 match result {
252 Some(doc) => {
253 let value = doc
254 .get_binary_generic(&self.value_field)
255 .map_err(parse_bson_error)?;
256 Ok(Some(Buffer::from(value.to_vec())))
257 }
258 None => Ok(None),
259 }
260 }
261
262 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
263 let collection = self.get_collection().await?;
264 let filter = doc! { self.key_field.as_str(): path };
265 let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
266 collection
267 .update_one(filter, update)
268 .upsert(true)
269 .await
270 .map_err(parse_mongodb_error)?;
271
272 Ok(())
273 }
274
275 async fn delete(&self, path: &str) -> Result<()> {
276 let collection = self.get_collection().await?;
277 let filter = doc! {self.key_field.as_str():path};
278 collection
279 .delete_one(filter)
280 .await
281 .map_err(parse_mongodb_error)?;
282 Ok(())
283 }
284}
285
286fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
287 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
288}
289
290fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
291 Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
292}