opendal/services/gridfs/
core.rs1use crate::{
19 raw::{adapters::kv, new_std_io_error},
20 Buffer, Capability, Error, ErrorKind, Result, Scheme,
21};
22use futures::AsyncReadExt;
23use futures::AsyncWriteExt;
24use mongodb::{
25 bson::doc,
26 gridfs::GridFsBucket,
27 options::{ClientOptions, GridFsBucketOptions},
28};
29use std::fmt::{Debug, Formatter};
30use tokio::sync::OnceCell;
31
32#[derive(Clone)]
33pub struct GridFsCore {
34 pub connection_string: String,
35 pub database: String,
36 pub bucket: String,
37 pub chunk_size: u32,
38 pub bucket_instance: OnceCell<GridFsBucket>,
39}
40
41impl Debug for GridFsCore {
42 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("GridfsCore")
44 .field("database", &self.database)
45 .field("bucket", &self.bucket)
46 .field("chunk_size", &self.chunk_size)
47 .finish()
48 }
49}
50
51impl GridFsCore {
52 async fn get_bucket(&self) -> Result<&GridFsBucket> {
53 self.bucket_instance
54 .get_or_try_init(|| async {
55 let client_options = ClientOptions::parse(&self.connection_string)
56 .await
57 .map_err(parse_mongodb_error)?;
58 let client =
59 mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
60 let bucket_options = GridFsBucketOptions::builder()
61 .bucket_name(Some(self.bucket.clone()))
62 .chunk_size_bytes(Some(self.chunk_size))
63 .build();
64 let bucket = client
65 .database(&self.database)
66 .gridfs_bucket(bucket_options);
67 Ok(bucket)
68 })
69 .await
70 }
71}
72
73impl kv::Adapter for GridFsCore {
74 type Scanner = (); fn info(&self) -> kv::Info {
77 kv::Info::new(
78 Scheme::Gridfs,
79 &format!("{}/{}", self.database, self.bucket),
80 Capability {
81 read: true,
82 write: true,
83 shared: true,
84 ..Default::default()
85 },
86 )
87 }
88
89 async fn get(&self, path: &str) -> Result<Option<Buffer>> {
90 let bucket = self.get_bucket().await?;
91 let filter = doc! { "filename": path };
92 let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
93 return Ok(None);
94 };
95
96 let mut destination = Vec::new();
97 let file_id = doc.id;
98 let mut stream = bucket
99 .open_download_stream(file_id)
100 .await
101 .map_err(parse_mongodb_error)?;
102 stream
103 .read_to_end(&mut destination)
104 .await
105 .map_err(new_std_io_error)?;
106 Ok(Some(Buffer::from(destination)))
107 }
108
109 async fn set(&self, path: &str, value: Buffer) -> Result<()> {
110 let bucket = self.get_bucket().await?;
111
112 let filter = doc! { "filename": path };
114 if let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? {
115 let file_id = doc.id;
116 bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
117 };
118
119 let mut upload_stream = bucket
121 .open_upload_stream(path)
122 .await
123 .map_err(parse_mongodb_error)?;
124 upload_stream
125 .write_all(&value.to_vec())
126 .await
127 .map_err(new_std_io_error)?;
128 upload_stream.close().await.map_err(new_std_io_error)?;
129
130 Ok(())
131 }
132
133 async fn delete(&self, path: &str) -> Result<()> {
134 let bucket = self.get_bucket().await?;
135 let filter = doc! { "filename": path };
136 let Some(doc) = bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
137 return Ok(());
138 };
139
140 let file_id = doc.id;
141 bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
142 Ok(())
143 }
144}
145
146fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
147 Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
148}