opendal/services/mongodb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use mongodb::bson::doc;
22use mongodb::bson::Binary;
23use mongodb::bson::Document;
24use mongodb::options::ClientOptions;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::*;
29use crate::services::MongodbConfig;
30use crate::*;
31
32impl Configurator for MongodbConfig {
33    type Builder = MongodbBuilder;
34    fn into_builder(self) -> Self::Builder {
35        MongodbBuilder { config: self }
36    }
37}
38
39#[doc = include_str!("docs.md")]
40#[derive(Default)]
41pub struct MongodbBuilder {
42    config: MongodbConfig,
43}
44
45impl Debug for MongodbBuilder {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("MongodbBuilder")
48            .field("config", &self.config)
49            .finish()
50    }
51}
52
53impl MongodbBuilder {
54    /// Set the connection_string of the MongoDB service.
55    ///
56    /// This connection string is used to connect to the MongoDB service. It typically follows the format:
57    ///
58    /// ## Format
59    ///
60    /// `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
61    ///
62    /// Examples:
63    ///
64    /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
65    /// - Using authentication: `mongodb://myUser:myPassword@localhost:27017/myAuthDB`
66    /// - Specifying authentication mechanism: `mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
67    ///
68    /// ## Options
69    ///
70    /// - `authMechanism`: Specifies the authentication method to use. Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
71    /// - ... (any other options you wish to highlight)
72    ///
73    /// For more information, please refer to [MongoDB Connection String URI Format](https://docs.mongodb.com/manual/reference/connection-string/).
74    pub fn connection_string(mut self, v: &str) -> Self {
75        if !v.is_empty() {
76            self.config.connection_string = Some(v.to_string());
77        }
78        self
79    }
80    /// Set the working directory, all operations will be performed under it.
81    ///
82    /// default: "/"
83    pub fn root(mut self, root: &str) -> Self {
84        self.config.root = if root.is_empty() {
85            None
86        } else {
87            Some(root.to_string())
88        };
89
90        self
91    }
92
93    /// Set the database name of the MongoDB service to read/write.
94    pub fn database(mut self, database: &str) -> Self {
95        if !database.is_empty() {
96            self.config.database = Some(database.to_string());
97        }
98        self
99    }
100
101    /// Set the collection name of the MongoDB service to read/write.
102    pub fn collection(mut self, collection: &str) -> Self {
103        if !collection.is_empty() {
104            self.config.collection = Some(collection.to_string());
105        }
106        self
107    }
108
109    /// Set the key field name of the MongoDB service to read/write.
110    ///
111    /// Default to `key` if not specified.
112    pub fn key_field(mut self, key_field: &str) -> Self {
113        if !key_field.is_empty() {
114            self.config.key_field = Some(key_field.to_string());
115        }
116        self
117    }
118
119    /// Set the value field name of the MongoDB service to read/write.
120    ///
121    /// Default to `value` if not specified.
122    pub fn value_field(mut self, value_field: &str) -> Self {
123        if !value_field.is_empty() {
124            self.config.value_field = Some(value_field.to_string());
125        }
126        self
127    }
128}
129
130impl Builder for MongodbBuilder {
131    type Config = MongodbConfig;
132
133    fn build(self) -> Result<impl Access> {
134        let conn = match &self.config.connection_string.clone() {
135            Some(v) => v.clone(),
136            None => {
137                return Err(
138                    Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
139                        .with_context("service", Scheme::Mongodb),
140                )
141            }
142        };
143        let database = match &self.config.database.clone() {
144            Some(v) => v.clone(),
145            None => {
146                return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
147                    .with_context("service", Scheme::Mongodb))
148            }
149        };
150        let collection = match &self.config.collection.clone() {
151            Some(v) => v.clone(),
152            None => {
153                return Err(
154                    Error::new(ErrorKind::ConfigInvalid, "collection is required")
155                        .with_context("service", Scheme::Mongodb),
156                )
157            }
158        };
159        let key_field = match &self.config.key_field.clone() {
160            Some(v) => v.clone(),
161            None => "key".to_string(),
162        };
163        let value_field = match &self.config.value_field.clone() {
164            Some(v) => v.clone(),
165            None => "value".to_string(),
166        };
167        let root = normalize_root(
168            self.config
169                .root
170                .clone()
171                .unwrap_or_else(|| "/".to_string())
172                .as_str(),
173        );
174        Ok(MongodbBackend::new(Adapter {
175            connection_string: conn,
176            database,
177            collection,
178            collection_instance: OnceCell::new(),
179            key_field,
180            value_field,
181        })
182        .with_normalized_root(root))
183    }
184}
185
186pub type MongodbBackend = kv::Backend<Adapter>;
187
188#[derive(Clone)]
189pub struct Adapter {
190    connection_string: String,
191    database: String,
192    collection: String,
193    collection_instance: OnceCell<mongodb::Collection<Document>>,
194    key_field: String,
195    value_field: String,
196}
197
198impl Debug for Adapter {
199    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("Adapter")
201            .field("connection_string", &self.connection_string)
202            .field("database", &self.database)
203            .field("collection", &self.collection)
204            .field("key_field", &self.key_field)
205            .field("value_field", &self.value_field)
206            .finish()
207    }
208}
209
210impl Adapter {
211    async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
212        self.collection_instance
213            .get_or_try_init(|| async {
214                let client_options = ClientOptions::parse(&self.connection_string)
215                    .await
216                    .map_err(parse_mongodb_error)?;
217                let client =
218                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
219                let database = client.database(&self.database);
220                let collection = database.collection(&self.collection);
221                Ok(collection)
222            })
223            .await
224    }
225}
226
227impl kv::Adapter for Adapter {
228    type Scanner = ();
229
230    fn info(&self) -> kv::Info {
231        kv::Info::new(
232            Scheme::Mongodb,
233            &format!("{}/{}", self.database, self.collection),
234            Capability {
235                read: true,
236                write: true,
237                shared: true,
238                ..Default::default()
239            },
240        )
241    }
242
243    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
244        let collection = self.get_collection().await?;
245        let filter = doc! {self.key_field.as_str():path};
246        let result = collection
247            .find_one(filter)
248            .await
249            .map_err(parse_mongodb_error)?;
250        match result {
251            Some(doc) => {
252                let value = doc
253                    .get_binary_generic(&self.value_field)
254                    .map_err(parse_bson_error)?;
255                Ok(Some(Buffer::from(value.to_vec())))
256            }
257            None => Ok(None),
258        }
259    }
260
261    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
262        let collection = self.get_collection().await?;
263        let filter = doc! { self.key_field.as_str(): path };
264        let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
265        collection
266            .update_one(filter, update)
267            .upsert(true)
268            .await
269            .map_err(parse_mongodb_error)?;
270
271        Ok(())
272    }
273
274    async fn delete(&self, path: &str) -> Result<()> {
275        let collection = self.get_collection().await?;
276        let filter = doc! {self.key_field.as_str():path};
277        collection
278            .delete_one(filter)
279            .await
280            .map_err(parse_mongodb_error)?;
281        Ok(())
282    }
283}
284
285fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
286    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
287}
288
289fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
290    Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
291}