opendal/services/mongodb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use mongodb::bson::doc;
22use mongodb::bson::Binary;
23use mongodb::bson::Document;
24use mongodb::options::ClientOptions;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::*;
29use crate::services::MongodbConfig;
30use crate::*;
31
32impl Configurator for MongodbConfig {
33    type Builder = MongodbBuilder;
34    fn into_builder(self) -> Self::Builder {
35        MongodbBuilder { config: self }
36    }
37}
38
39#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct MongodbBuilder {
42    config: MongodbConfig,
43}
44
45impl Debug for MongodbBuilder {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("MongodbBuilder")
48            .field("config", &self.config)
49            .finish()
50    }
51}
52
53impl MongodbBuilder {
54    /// Set the connection_string of the MongoDB service.
55    ///
56    /// This connection string is used to connect to the MongoDB service. It typically follows the format:
57    ///
58    /// ## Format
59    ///
60    /// `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
61    ///
62    /// Examples:
63    ///
64    /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
65    /// - Using authentication: `mongodb://myUser:myPassword@localhost:27017/myAuthDB`
66    /// - Specifying authentication mechanism: `mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
67    ///
68    /// ## Options
69    ///
70    /// - `authMechanism`: Specifies the authentication method to use. Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
71    /// - ... (any other options you wish to highlight)
72    ///
73    /// For more information, please refer to [MongoDB Connection String URI Format](https://docs.mongodb.com/manual/reference/connection-string/).
74    pub fn connection_string(mut self, v: &str) -> Self {
75        if !v.is_empty() {
76            self.config.connection_string = Some(v.to_string());
77        }
78        self
79    }
80    /// Set the working directory, all operations will be performed under it.
81    ///
82    /// default: "/"
83    pub fn root(mut self, root: &str) -> Self {
84        self.config.root = if root.is_empty() {
85            None
86        } else {
87            Some(root.to_string())
88        };
89
90        self
91    }
92
93    /// Set the database name of the MongoDB service to read/write.
94    pub fn database(mut self, database: &str) -> Self {
95        if !database.is_empty() {
96            self.config.database = Some(database.to_string());
97        }
98        self
99    }
100
101    /// Set the collection name of the MongoDB service to read/write.
102    pub fn collection(mut self, collection: &str) -> Self {
103        if !collection.is_empty() {
104            self.config.collection = Some(collection.to_string());
105        }
106        self
107    }
108
109    /// Set the key field name of the MongoDB service to read/write.
110    ///
111    /// Default to `key` if not specified.
112    pub fn key_field(mut self, key_field: &str) -> Self {
113        if !key_field.is_empty() {
114            self.config.key_field = Some(key_field.to_string());
115        }
116        self
117    }
118
119    /// Set the value field name of the MongoDB service to read/write.
120    ///
121    /// Default to `value` if not specified.
122    pub fn value_field(mut self, value_field: &str) -> Self {
123        if !value_field.is_empty() {
124            self.config.value_field = Some(value_field.to_string());
125        }
126        self
127    }
128}
129
130impl Builder for MongodbBuilder {
131    const SCHEME: Scheme = Scheme::Mongodb;
132    type Config = MongodbConfig;
133
134    fn build(self) -> Result<impl Access> {
135        let conn = match &self.config.connection_string.clone() {
136            Some(v) => v.clone(),
137            None => {
138                return Err(
139                    Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
140                        .with_context("service", Scheme::Mongodb),
141                )
142            }
143        };
144        let database = match &self.config.database.clone() {
145            Some(v) => v.clone(),
146            None => {
147                return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
148                    .with_context("service", Scheme::Mongodb))
149            }
150        };
151        let collection = match &self.config.collection.clone() {
152            Some(v) => v.clone(),
153            None => {
154                return Err(
155                    Error::new(ErrorKind::ConfigInvalid, "collection is required")
156                        .with_context("service", Scheme::Mongodb),
157                )
158            }
159        };
160        let key_field = match &self.config.key_field.clone() {
161            Some(v) => v.clone(),
162            None => "key".to_string(),
163        };
164        let value_field = match &self.config.value_field.clone() {
165            Some(v) => v.clone(),
166            None => "value".to_string(),
167        };
168        let root = normalize_root(
169            self.config
170                .root
171                .clone()
172                .unwrap_or_else(|| "/".to_string())
173                .as_str(),
174        );
175        Ok(MongodbBackend::new(Adapter {
176            connection_string: conn,
177            database,
178            collection,
179            collection_instance: OnceCell::new(),
180            key_field,
181            value_field,
182        })
183        .with_normalized_root(root))
184    }
185}
186
187pub type MongodbBackend = kv::Backend<Adapter>;
188
189#[derive(Clone)]
190pub struct Adapter {
191    connection_string: String,
192    database: String,
193    collection: String,
194    collection_instance: OnceCell<mongodb::Collection<Document>>,
195    key_field: String,
196    value_field: String,
197}
198
199impl Debug for Adapter {
200    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
201        f.debug_struct("Adapter")
202            .field("connection_string", &self.connection_string)
203            .field("database", &self.database)
204            .field("collection", &self.collection)
205            .field("key_field", &self.key_field)
206            .field("value_field", &self.value_field)
207            .finish()
208    }
209}
210
211impl Adapter {
212    async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
213        self.collection_instance
214            .get_or_try_init(|| async {
215                let client_options = ClientOptions::parse(&self.connection_string)
216                    .await
217                    .map_err(parse_mongodb_error)?;
218                let client =
219                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
220                let database = client.database(&self.database);
221                let collection = database.collection(&self.collection);
222                Ok(collection)
223            })
224            .await
225    }
226}
227
228impl kv::Adapter for Adapter {
229    type Scanner = ();
230
231    fn info(&self) -> kv::Info {
232        kv::Info::new(
233            Scheme::Mongodb,
234            &format!("{}/{}", self.database, self.collection),
235            Capability {
236                read: true,
237                write: true,
238                shared: true,
239                ..Default::default()
240            },
241        )
242    }
243
244    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
245        let collection = self.get_collection().await?;
246        let filter = doc! {self.key_field.as_str():path};
247        let result = collection
248            .find_one(filter)
249            .await
250            .map_err(parse_mongodb_error)?;
251        match result {
252            Some(doc) => {
253                let value = doc
254                    .get_binary_generic(&self.value_field)
255                    .map_err(parse_bson_error)?;
256                Ok(Some(Buffer::from(value.to_vec())))
257            }
258            None => Ok(None),
259        }
260    }
261
262    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
263        let collection = self.get_collection().await?;
264        let filter = doc! { self.key_field.as_str(): path };
265        let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
266        collection
267            .update_one(filter, update)
268            .upsert(true)
269            .await
270            .map_err(parse_mongodb_error)?;
271
272        Ok(())
273    }
274
275    async fn delete(&self, path: &str) -> Result<()> {
276        let collection = self.get_collection().await?;
277        let filter = doc! {self.key_field.as_str():path};
278        collection
279            .delete_one(filter)
280            .await
281            .map_err(parse_mongodb_error)?;
282        Ok(())
283    }
284}
285
286fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
287    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
288}
289
290fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
291    Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
292}