opendal_core/services/mongodb/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use mea::once::OnceCell;
21use mongodb::bson::Binary;
22use mongodb::bson::Document;
23use mongodb::bson::doc;
24use mongodb::options::ClientOptions;
25
26use crate::*;
27
28#[derive(Clone)]
29pub struct MongodbCore {
30    pub connection_string: String,
31    pub database: String,
32    pub collection: String,
33    pub collection_instance: OnceCell<mongodb::Collection<Document>>,
34    pub key_field: String,
35    pub value_field: String,
36}
37
38impl Debug for MongodbCore {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("MongodbCore")
41            .field("database", &self.database)
42            .field("collection", &self.collection)
43            .field("key_field", &self.key_field)
44            .field("value_field", &self.value_field)
45            .finish_non_exhaustive()
46    }
47}
48
49impl MongodbCore {
50    async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> {
51        self.collection_instance
52            .get_or_try_init(|| async {
53                let client_options = ClientOptions::parse(&self.connection_string)
54                    .await
55                    .map_err(parse_mongodb_error)?;
56                let client =
57                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
58                let database = client.database(&self.database);
59                let collection = database.collection(&self.collection);
60                Ok(collection)
61            })
62            .await
63    }
64
65    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
66        let collection = self.get_collection().await?;
67        let filter = doc! {self.key_field.as_str():path};
68        let result = collection
69            .find_one(filter)
70            .await
71            .map_err(parse_mongodb_error)?;
72        match result {
73            Some(doc) => {
74                let value = doc
75                    .get_binary_generic(&self.value_field)
76                    .map_err(parse_bson_error)?;
77                Ok(Some(Buffer::from(value.to_vec())))
78            }
79            None => Ok(None),
80        }
81    }
82
83    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
84        let collection = self.get_collection().await?;
85        let filter = doc! { self.key_field.as_str(): path };
86        let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
87        collection
88            .update_one(filter, update)
89            .upsert(true)
90            .await
91            .map_err(parse_mongodb_error)?;
92
93        Ok(())
94    }
95
96    pub async fn delete(&self, path: &str) -> Result<()> {
97        let collection = self.get_collection().await?;
98        let filter = doc! {self.key_field.as_str():path};
99        collection
100            .delete_one(filter)
101            .await
102            .map_err(parse_mongodb_error)?;
103        Ok(())
104    }
105}
106
107fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
108    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
109}
110
111fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error {
112    Error::new(ErrorKind::Unexpected, "bson error").set_source(err)
113}