opendal/services/gridfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19    raw::{adapters::kv, new_std_io_error},
20    Buffer, Capability, Error, ErrorKind, Result, Scheme,
21};
22use futures::AsyncReadExt;
23use futures::AsyncWriteExt;
24use mongodb::{
25    bson::doc,
26    gridfs::GridFsBucket,
27    options::{ClientOptions, GridFsBucketOptions},
28};
29use std::fmt::{Debug, Formatter};
30use tokio::sync::OnceCell;
31
32#[derive(Clone)]
33pub struct GridFsCore {
34    pub connection_string: String,
35    pub database: String,
36    pub bucket: String,
37    pub chunk_size: u32,
38    pub bucket_instance: OnceCell<GridFsBucket>,
39}
40
41impl Debug for GridFsCore {
42    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("GridfsCore")
44            .field("database", &self.database)
45            .field("bucket", &self.bucket)
46            .field("chunk_size", &self.chunk_size)
47            .finish()
48    }
49}
50
51impl GridFsCore {
52    async fn get_bucket(&self) -> Result<&GridFsBucket> {
53        self.bucket_instance
54            .get_or_try_init(|| async {
55                let client_options = ClientOptions::parse(&self.connection_string)
56                    .await
57                    .map_err(parse_mongodb_error)?;
58                let client =
59                    mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
60                let bucket_options = GridFsBucketOptions::builder()
61                    .bucket_name(Some(self.bucket.clone()))
62                    .chunk_size_bytes(Some(self.chunk_size))
63                    .build();
64                let bucket = client
65                    .database(&self.database)
66                    .gridfs_bucket(bucket_options);
67                Ok(bucket)
68            })
69            .await
70    }
71}
72
73impl kv::Adapter for GridFsCore {
74    type Scanner = (); // Replace with the actual Scanner type.
75
76    fn info(&self) -> kv::Info {
77        kv::Info::new(
78            Scheme::Gridfs,
79            &format!("{}/{}", self.database, self.bucket),
80            Capability {
81                read: true,
82                write: true,
83                shared: true,
84                ..Default::default()
85            },
86        )
87    }
88
89    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
90        let bucket = self.get_bucket().await?;
91        let filter = doc! { "filename": path };
92        let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
93            return Ok(None);
94        };
95
96        let mut destination = Vec::new();
97        let file_id = doc.id;
98        let mut stream = bucket
99            .open_download_stream(file_id)
100            .await
101            .map_err(parse_mongodb_error)?;
102        stream
103            .read_to_end(&mut destination)
104            .await
105            .map_err(new_std_io_error)?;
106        Ok(Some(Buffer::from(destination)))
107    }
108
109    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
110        let bucket = self.get_bucket().await?;
111
112        // delete old file if exists
113        let filter = doc! { "filename": path };
114        if let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? {
115            let file_id = doc.id;
116            bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
117        };
118
119        // set new file
120        let mut upload_stream = bucket
121            .open_upload_stream(path)
122            .await
123            .map_err(parse_mongodb_error)?;
124        upload_stream
125            .write_all(&value.to_vec())
126            .await
127            .map_err(new_std_io_error)?;
128        upload_stream.close().await.map_err(new_std_io_error)?;
129
130        Ok(())
131    }
132
133    async fn delete(&self, path: &str) -> Result<()> {
134        let bucket = self.get_bucket().await?;
135        let filter = doc! { "filename": path };
136        let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
137            return Ok(());
138        };
139
140        let file_id = doc.id;
141        bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
142        Ok(())
143    }
144}
145
146fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
147    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
148}