opendal/services/mongodb/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use mongodb::bson::Binary;
22use mongodb::bson::Document;
23use mongodb::bson::doc;
24use mongodb::options::ClientOptions;
25use tokio::sync::OnceCell;
26
27use crate::raw::adapters::kv;
28use crate::raw::*;
29use crate::services::MongodbConfig;
30use crate::*;
31
32#[doc = include_str!("docs.md")]
33#[derive(Default)]
34pub struct MongodbBuilder {
35    pub(super) config: MongodbConfig,
36}
37
38impl Debug for MongodbBuilder {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("MongodbBuilder")
41            .field("config", &self.config)
42            .finish()
43    }
44}
45
46impl MongodbBuilder {
47    /// Set the connection_string of the MongoDB service.
48    ///
49    /// This connection string is used to connect to the MongoDB service. It typically follows the format:
50    ///
51    /// ## Format
52    ///
53    /// `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
54    ///
55    /// Examples:
56    ///
57    /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
58    /// - Using authentication: `mongodb://myUser:myPassword@localhost:27017/myAuthDB`
59    /// - Specifying authentication mechanism: `mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
60    ///
61    /// ## Options
62    ///
63    /// - `authMechanism`: Specifies the authentication method to use. Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
64    /// - ... (any other options you wish to highlight)
65    ///
66    /// For more information, please refer to [MongoDB Connection String URI Format](https://docs.mongodb.com/manual/reference/connection-string/).
67    pub fn connection_string(mut self, v: &str) -> Self {
68        if !v.is_empty() {
69            self.config.connection_string = Some(v.to_string());
70        }
71        self
72    }
73    /// Set the working directory, all operations will be performed under it.
74    ///
75    /// default: "/"
76    pub fn root(mut self, root: &str) -> Self {
77        self.config.root = if root.is_empty() {
78            None
79        } else {
80            Some(root.to_string())
81        };
82
83        self
84    }
85
86    /// Set the database name of the MongoDB service to read/write.
87    pub fn database(mut self, database: &str) -> Self {
88        if !database.is_empty() {
89            self.config.database = Some(database.to_string());
90        }
91        self
92    }
93
94    /// Set the collection name of the MongoDB service to read/write.
95    pub fn collection(mut self, collection: &str) -> Self {
96        if !collection.is_empty() {
97            self.config.collection = Some(collection.to_string());
98        }
99        self
100    }
101
102    /// Set the key field name of the MongoDB service to read/write.
103    ///
104    /// Default to `key` if not specified.
105    pub fn key_field(mut self, key_field: &str) -> Self {
106        if !key_field.is_empty() {
107            self.config.key_field = Some(key_field.to_string());
108        }
109        self
110    }
111
112    /// Set the value field name of the MongoDB service to read/write.
113    ///
114    /// Default to `value` if not specified.
115    pub fn value_field(mut self, value_field: &str) -> Self {
116        if !value_field.is_empty() {
117            self.config.value_field = Some(value_field.to_string());
118        }
119        self
120    }
121}
122
123impl Builder for MongodbBuilder {
124    type Config = MongodbConfig;
125
126    fn build(self) -> Result<impl Access> {
127        let conn = match &self.config.connection_string.clone() {
128            Some(v) => v.clone(),
129            None => {
130                return Err(
131                    Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
132                        .with_context("service", Scheme::Mongodb),
133                );
134            }
135        };
136        let database = match &self.config.database.clone() {
137            Some(v) => v.clone(),
138            None => {
139                return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
140                    .with_context("service", Scheme::Mongodb));
141            }
142        };
143        let collection = match &self.config.collection.clone() {
144            Some(v) => v.clone(),
145            None => {
146                return Err(
147                    Error::new(ErrorKind::ConfigInvalid, "collection is required")
148                        .with_context("service", Scheme::Mongodb),
149                );
150            }
151        };
152        let key_field = match &self.config.key_field.clone() {
153            Some(v) => v.clone(),
154            None => "key".to_string(),
155        };
156        let value_field = match &self.config.value_field.clone() {
157            Some(v) => v.clone(),
158            None => "value".to_string(),
159        };
160        let root = normalize_root(
161            self.config
162                .root
163                .clone()
164                .unwrap_or_else(|| "/".to_string())
165                .as_str(),
166        );
167        Ok(MongodbBackend::new(Adapter {
168            connection_string: conn,
169            database,
170            collection,
171            collection_instance: OnceCell::new(),
172            key_field,
173            value_field,
174        })
175        .with_normalized_root(root))
176    }
177}
178
179pub type MongodbBackend = kv::Backend<Adapter>;
180
181#[derive(Clone)]
182pub struct Adapter {
183    connection_string: String,
184    database: String,
185    collection: String,
186    collection_instance: OnceCell<mongodb::Collection<Document>>,
187    key_field: String,
188    value_field: String,
189}
190
191impl Debug for Adapter {
192    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("Adapter")
194            .field("connection_string", &self.connection_string)
195            .field("database", &self.database)
196            .field("collection", &self.collection)
197            .field("key_field", &self.key_field)
198            .field("value_field", &self.value_field)
199            .finish()
200    }
201}
202
203impl Adapter {
204    async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
205        self.collection_instance
206            .get_or_try_init(|| async {
207                let client_options = ClientOptions::parse(&self.connection_string)
208                    .await
209                    .map_err(parse_mongodb_error)?;
210                let client =
211                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
212                let database = client.database(&self.database);
213                let collection = database.collection(&self.collection);
214                Ok(collection)
215            })
216            .await
217    }
218}
219
220impl kv::Adapter for Adapter {
221    type Scanner = ();
222
223    fn info(&self) -> kv::Info {
224        kv::Info::new(
225            Scheme::Mongodb,
226            &format!("{}/{}", self.database, self.collection),
227            Capability {
228                read: true,
229                write: true,
230                shared: true,
231                ..Default::default()
232            },
233        )
234    }
235
236    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
237        let collection = self.get_collection().await?;
238        let filter = doc! {self.key_field.as_str():path};
239        let result = collection
240            .find_one(filter)
241            .await
242            .map_err(parse_mongodb_error)?;
243        match result {
244            Some(doc) => {
245                let value = doc
246                    .get_binary_generic(&self.value_field)
247                    .map_err(parse_bson_error)?;
248                Ok(Some(Buffer::from(value.to_vec())))
249            }
250            None => Ok(None),
251        }
252    }
253
254    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
255        let collection = self.get_collection().await?;
256        let filter = doc! { self.key_field.as_str(): path };
257        let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
258        collection
259            .update_one(filter, update)
260            .upsert(true)
261            .await
262            .map_err(parse_mongodb_error)?;
263
264        Ok(())
265    }
266
267    async fn delete(&self, path: &str) -> Result<()> {
268        let collection = self.get_collection().await?;
269        let filter = doc! {self.key_field.as_str():path};
270        collection
271            .delete_one(filter)
272            .await
273            .map_err(parse_mongodb_error)?;
274        Ok(())
275    }
276}
277
278fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
279    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
280}
281
282fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
283    Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
284}