opendal/services/gridfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use futures::AsyncReadExt;
22use futures::AsyncWriteExt;
23use mongodb::bson::doc;
24use mongodb::gridfs::GridFsBucket;
25use mongodb::options::ClientOptions;
26use mongodb::options::GridFsBucketOptions;
27use tokio::sync::OnceCell;
28
29use crate::raw::adapters::kv;
30use crate::raw::new_std_io_error;
31use crate::Buffer;
32use crate::Capability;
33use crate::Error;
34use crate::ErrorKind;
35use crate::Result;
36use crate::Scheme;
37
38#[derive(Clone)]
39pub struct GridFsCore {
40    pub connection_string: String,
41    pub database: String,
42    pub bucket: String,
43    pub chunk_size: u32,
44    pub bucket_instance: OnceCell<GridFsBucket>,
45}
46
47impl Debug for GridFsCore {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("GridfsCore")
50            .field("database", &self.database)
51            .field("bucket", &self.bucket)
52            .field("chunk_size", &self.chunk_size)
53            .finish()
54    }
55}
56
57impl GridFsCore {
58    async fn get_bucket(&self) -> Result<&GridFsBucket> {
59        self.bucket_instance
60            .get_or_try_init(|| async {
61                let client_options = ClientOptions::parse(&self.connection_string)
62                    .await
63                    .map_err(parse_mongodb_error)?;
64                let client =
65                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
66                let bucket_options = GridFsBucketOptions::builder()
67                    .bucket_name(Some(self.bucket.clone()))
68                    .chunk_size_bytes(Some(self.chunk_size))
69                    .build();
70                let bucket = client
71                    .database(&self.database)
72                    .gridfs_bucket(bucket_options);
73                Ok(bucket)
74            })
75            .await
76    }
77}
78
79impl kv::Adapter for GridFsCore {
80    type Scanner = (); // Replace with the actual Scanner type.
81
82    fn info(&self) -> kv::Info {
83        kv::Info::new(
84            Scheme::Gridfs,
85            &format!("{}/{}", self.database, self.bucket),
86            Capability {
87                read: true,
88                write: true,
89                shared: true,
90                ..Default::default()
91            },
92        )
93    }
94
95    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
96        let bucket = self.get_bucket().await?;
97        let filter = doc! { "filename": path };
98        let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
99            return Ok(None);
100        };
101
102        let mut destination = Vec::new();
103        let file_id = doc.id;
104        let mut stream = bucket
105            .open_download_stream(file_id)
106            .await
107            .map_err(parse_mongodb_error)?;
108        stream
109            .read_to_end(&mut destination)
110            .await
111            .map_err(new_std_io_error)?;
112        Ok(Some(Buffer::from(destination)))
113    }
114
115    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
116        let bucket = self.get_bucket().await?;
117
118        // delete old file if exists
119        let filter = doc! { "filename": path };
120        if let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? {
121            let file_id = doc.id;
122            bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
123        };
124
125        // set new file
126        let mut upload_stream = bucket
127            .open_upload_stream(path)
128            .await
129            .map_err(parse_mongodb_error)?;
130        upload_stream
131            .write_all(&value.to_vec())
132            .await
133            .map_err(new_std_io_error)?;
134        upload_stream.close().await.map_err(new_std_io_error)?;
135
136        Ok(())
137    }
138
139    async fn delete(&self, path: &str) -> Result<()> {
140        let bucket = self.get_bucket().await?;
141        let filter = doc! { "filename": path };
142        let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
143            return Ok(());
144        };
145
146        let file_id = doc.id;
147        bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
148        Ok(())
149    }
150}
151
152fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
153    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
154}