opendal_core/services/gridfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19
20use futures::AsyncReadExt;
21use futures::AsyncWriteExt;
22use mea::once::OnceCell;
23use mongodb::bson::doc;
24use mongodb::gridfs::GridFsBucket;
25use mongodb::options::ClientOptions;
26use mongodb::options::GridFsBucketOptions;
27
28use crate::raw::*;
29use crate::*;
30
31#[derive(Clone)]
32pub struct GridfsCore {
33    pub connection_string: String,
34    pub database: String,
35    pub bucket: String,
36    pub chunk_size: u32,
37    pub bucket_instance: OnceCell<GridFsBucket>,
38}
39
40impl Debug for GridfsCore {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("GridfsCore")
43            .field("database", &self.database)
44            .field("bucket", &self.bucket)
45            .field("chunk_size", &self.chunk_size)
46            .finish_non_exhaustive()
47    }
48}
49
50impl GridfsCore {
51    async fn get_bucket(&self) -> Result<&GridFsBucket> {
52        self.bucket_instance
53            .get_or_try_init(|| async {
54                let client_options = ClientOptions::parse(&self.connection_string)
55                    .await
56                    .map_err(parse_mongodb_error)?;
57                let client =
58                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
59                let bucket_options = GridFsBucketOptions::builder()
60                    .bucket_name(Some(self.bucket.clone()))
61                    .chunk_size_bytes(Some(self.chunk_size))
62                    .build();
63                let bucket = client
64                    .database(&self.database)
65                    .gridfs_bucket(bucket_options);
66                Ok(bucket)
67            })
68            .await
69    }
70
71    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
72        let bucket = self.get_bucket().await?;
73        let filter = doc! { "filename": path };
74        let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
75            return Ok(None);
76        };
77
78        let mut destination = Vec::new();
79        let file_id = doc.id;
80        let mut stream = bucket
81            .open_download_stream(file_id)
82            .await
83            .map_err(parse_mongodb_error)?;
84        stream
85            .read_to_end(&mut destination)
86            .await
87            .map_err(new_std_io_error)?;
88        Ok(Some(Buffer::from(destination)))
89    }
90
91    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
92        let bucket = self.get_bucket().await?;
93
94        // delete old file if exists
95        let filter = doc! { "filename": path };
96        if let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? {
97            let file_id = doc.id;
98            bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
99        };
100
101        // set new file
102        let mut upload_stream = bucket
103            .open_upload_stream(path)
104            .await
105            .map_err(parse_mongodb_error)?;
106        upload_stream
107            .write_all(&value.to_vec())
108            .await
109            .map_err(new_std_io_error)?;
110        upload_stream.close().await.map_err(new_std_io_error)?;
111
112        Ok(())
113    }
114
115    pub async fn delete(&self, path: &str) -> Result<()> {
116        let bucket = self.get_bucket().await?;
117        let filter = doc! { "filename": path };
118        let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
119            return Ok(());
120        };
121
122        let file_id = doc.id;
123        bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
124        Ok(())
125    }
126}
127
128fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
129    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
130}