opendal/services/gridfs/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20
21use futures::AsyncReadExt;
22use futures::AsyncWriteExt;
23use mongodb::bson::doc;
24use mongodb::gridfs::GridFsBucket;
25use mongodb::options::ClientOptions;
26use mongodb::options::GridFsBucketOptions;
27use tokio::sync::OnceCell;
28
29use crate::raw::adapters::kv;
30use crate::raw::new_std_io_error;
31use crate::Buffer;
32use crate::Capability;
33use crate::Error;
34use crate::ErrorKind;
35use crate::Result;
36use crate::Scheme;
37
38#[derive(Clone)]
39pub struct GridFsCore {
40 pub connection_string: String,
41 pub database: String,
42 pub bucket: String,
43 pub chunk_size: u32,
44 pub bucket_instance: OnceCell<GridFsBucket>,
45}
46
47impl Debug for GridFsCore {
48 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("GridfsCore")
50 .field("database", &self.database)
51 .field("bucket", &self.bucket)
52 .field("chunk_size", &self.chunk_size)
53 .finish()
54 }
55}
56
57impl GridFsCore {
58 async fn get_bucket(&self) -> Result<&GridFsBucket> {
59 self.bucket_instance
60 .get_or_try_init(|| async {
61 let client_options = ClientOptions::parse(&self.connection_string)
62 .await
63 .map_err(parse_mongodb_error)?;
64 let client =
65 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
66 let bucket_options = GridFsBucketOptions::builder()
67 .bucket_name(Some(self.bucket.clone()))
68 .chunk_size_bytes(Some(self.chunk_size))
69 .build();
70 let bucket = client
71 .database(&self.database)
72 .gridfs_bucket(bucket_options);
73 Ok(bucket)
74 })
75 .await
76 }
77}
78
79impl kv::Adapter for GridFsCore {
80 type Scanner = (); fn info(&self) -> kv::Info {
83 kv::Info::new(
84 Scheme::Gridfs,
85 &format!("{}/{}", self.database, self.bucket),
86 Capability {
87 read: true,
88 write: true,
89 shared: true,
90 ..Default::default()
91 },
92 )
93 }
94
95 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
96 let bucket = self.get_bucket().await?;
97 let filter = doc! { "filename": path };
98 let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
99 return Ok(None);
100 };
101
102 let mut destination = Vec::new();
103 let file_id = doc.id;
104 let mut stream = bucket
105 .open_download_stream(file_id)
106 .await
107 .map_err(parse_mongodb_error)?;
108 stream
109 .read_to_end(&mut destination)
110 .await
111 .map_err(new_std_io_error)?;
112 Ok(Some(Buffer::from(destination)))
113 }
114
115 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
116 let bucket = self.get_bucket().await?;
117
118 let filter = doc! { "filename": path };
120 if let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? {
121 let file_id = doc.id;
122 bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
123 };
124
125 let mut upload_stream = bucket
127 .open_upload_stream(path)
128 .await
129 .map_err(parse_mongodb_error)?;
130 upload_stream
131 .write_all(&value.to_vec())
132 .await
133 .map_err(new_std_io_error)?;
134 upload_stream.close().await.map_err(new_std_io_error)?;
135
136 Ok(())
137 }
138
139 async fn delete(&self, path: &str) -> Result<()> {
140 let bucket = self.get_bucket().await?;
141 let filter = doc! { "filename": path };
142 let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
143 return Ok(());
144 };
145
146 let file_id = doc.id;
147 bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
148 Ok(())
149 }
150}
151
152fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
153 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
154}