opendal_core/services/gridfs/
core.rs1use std::fmt::Debug;
19
20use futures::AsyncReadExt;
21use futures::AsyncWriteExt;
22use mea::once::OnceCell;
23use mongodb::bson::doc;
24use mongodb::gridfs::GridFsBucket;
25use mongodb::options::ClientOptions;
26use mongodb::options::GridFsBucketOptions;
27
28use crate::raw::*;
29use crate::*;
30
31#[derive(Clone)]
32pub struct GridfsCore {
33 pub connection_string: String,
34 pub database: String,
35 pub bucket: String,
36 pub chunk_size: u32,
37 pub bucket_instance: OnceCell<GridFsBucket>,
38}
39
40impl Debug for GridfsCore {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("GridfsCore")
43 .field("database", &self.database)
44 .field("bucket", &self.bucket)
45 .field("chunk_size", &self.chunk_size)
46 .finish_non_exhaustive()
47 }
48}
49
50impl GridfsCore {
51 async fn get_bucket(&self) -> Result<&GridFsBucket> {
52 self.bucket_instance
53 .get_or_try_init(|| async {
54 let client_options = ClientOptions::parse(&self.connection_string)
55 .await
56 .map_err(parse_mongodb_error)?;
57 let client =
58 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
59 let bucket_options = GridFsBucketOptions::builder()
60 .bucket_name(Some(self.bucket.clone()))
61 .chunk_size_bytes(Some(self.chunk_size))
62 .build();
63 let bucket = client
64 .database(&self.database)
65 .gridfs_bucket(bucket_options);
66 Ok(bucket)
67 })
68 .await
69 }
70
71 pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
72 let bucket = self.get_bucket().await?;
73 let filter = doc! { "filename": path };
74 let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
75 return Ok(None);
76 };
77
78 let mut destination = Vec::new();
79 let file_id = doc.id;
80 let mut stream = bucket
81 .open_download_stream(file_id)
82 .await
83 .map_err(parse_mongodb_error)?;
84 stream
85 .read_to_end(&mut destination)
86 .await
87 .map_err(new_std_io_error)?;
88 Ok(Some(Buffer::from(destination)))
89 }
90
91 pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
92 let bucket = self.get_bucket().await?;
93
94 let filter = doc! { "filename": path };
96 if let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? {
97 let file_id = doc.id;
98 bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
99 };
100
101 let mut upload_stream = bucket
103 .open_upload_stream(path)
104 .await
105 .map_err(parse_mongodb_error)?;
106 upload_stream
107 .write_all(&value.to_vec())
108 .await
109 .map_err(new_std_io_error)?;
110 upload_stream.close().await.map_err(new_std_io_error)?;
111
112 Ok(())
113 }
114
115 pub async fn delete(&self, path: &str) -> Result<()> {
116 let bucket = self.get_bucket().await?;
117 let filter = doc! { "filename": path };
118 let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
119 return Ok(());
120 };
121
122 let file_id = doc.id;
123 bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
124 Ok(())
125 }
126}
127
128fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
129 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
130}