opendal/services/gridfs/
backend.rs1use std::sync::Arc;
19
20use mea::once::OnceCell;
21
22use super::GRIDFS_SCHEME;
23use super::config::GridfsConfig;
24use super::core::*;
25use super::deleter::GridfsDeleter;
26use super::writer::GridfsWriter;
27use crate::raw::*;
28use crate::*;
29
30#[doc = include_str!("docs.md")]
31#[derive(Debug, Default)]
32pub struct GridfsBuilder {
33 pub(super) config: GridfsConfig,
34}
35
36impl GridfsBuilder {
37 pub fn connection_string(mut self, v: &str) -> Self {
58 if !v.is_empty() {
59 self.config.connection_string = Some(v.to_string());
60 }
61 self
62 }
63
64 pub fn root(mut self, root: &str) -> Self {
68 self.config.root = if root.is_empty() {
69 None
70 } else {
71 Some(root.to_string())
72 };
73
74 self
75 }
76
77 pub fn database(mut self, database: &str) -> Self {
79 if !database.is_empty() {
80 self.config.database = Some(database.to_string());
81 }
82 self
83 }
84
85 pub fn bucket(mut self, bucket: &str) -> Self {
89 if !bucket.is_empty() {
90 self.config.bucket = Some(bucket.to_string());
91 }
92 self
93 }
94
95 pub fn chunk_size(mut self, chunk_size: u32) -> Self {
99 if chunk_size > 0 {
100 self.config.chunk_size = Some(chunk_size);
101 }
102 self
103 }
104}
105
106impl Builder for GridfsBuilder {
107 type Config = GridfsConfig;
108
109 fn build(self) -> Result<impl Access> {
110 let conn = match &self.config.connection_string.clone() {
111 Some(v) => v.clone(),
112 None => {
113 return Err(
114 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
115 .with_context("service", GRIDFS_SCHEME),
116 );
117 }
118 };
119 let database = match &self.config.database.clone() {
120 Some(v) => v.clone(),
121 None => {
122 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
123 .with_context("service", GRIDFS_SCHEME));
124 }
125 };
126 let bucket = match &self.config.bucket.clone() {
127 Some(v) => v.clone(),
128 None => "fs".to_string(),
129 };
130 let chunk_size = self.config.chunk_size.unwrap_or(255);
131
132 let root = normalize_root(
133 self.config
134 .root
135 .clone()
136 .unwrap_or_else(|| "/".to_string())
137 .as_str(),
138 );
139
140 Ok(GridfsBackend::new(GridfsCore {
141 connection_string: conn,
142 database,
143 bucket,
144 chunk_size,
145 bucket_instance: OnceCell::new(),
146 })
147 .with_normalized_root(root))
148 }
149}
150
151#[derive(Clone, Debug)]
153pub struct GridfsBackend {
154 core: Arc<GridfsCore>,
155 root: String,
156 info: Arc<AccessorInfo>,
157}
158
159impl GridfsBackend {
160 pub fn new(core: GridfsCore) -> Self {
161 let info = AccessorInfo::default();
162 info.set_scheme(GRIDFS_SCHEME);
163 info.set_name(&format!("{}/{}", core.database, core.bucket));
164 info.set_root("/");
165 info.set_native_capability(Capability {
166 read: true,
167 stat: true,
168 write: true,
169 write_can_empty: true,
170 delete: true,
171 shared: true,
172 ..Default::default()
173 });
174
175 Self {
176 core: Arc::new(core),
177 root: "/".to_string(),
178 info: Arc::new(info),
179 }
180 }
181
182 fn with_normalized_root(mut self, root: String) -> Self {
183 self.info.set_root(&root);
184 self.root = root;
185 self
186 }
187}
188
189impl Access for GridfsBackend {
190 type Reader = Buffer;
191 type Writer = GridfsWriter;
192 type Lister = ();
193 type Deleter = oio::OneShotDeleter<GridfsDeleter>;
194
195 fn info(&self) -> Arc<AccessorInfo> {
196 self.info.clone()
197 }
198
199 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
200 let p = build_abs_path(&self.root, path);
201
202 if p == build_abs_path(&self.root, "") {
203 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
204 } else {
205 let bs = self.core.get(&p).await?;
206 match bs {
207 Some(bs) => Ok(RpStat::new(
208 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
209 )),
210 None => Err(Error::new(ErrorKind::NotFound, "kv not found in gridfs")),
211 }
212 }
213 }
214
215 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
216 let p = build_abs_path(&self.root, path);
217 let bs = match self.core.get(&p).await? {
218 Some(bs) => bs,
219 None => {
220 return Err(Error::new(ErrorKind::NotFound, "kv not found in gridfs"));
221 }
222 };
223 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
224 }
225
226 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
227 let p = build_abs_path(&self.root, path);
228 Ok((RpWrite::new(), GridfsWriter::new(self.core.clone(), p)))
229 }
230
231 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
232 Ok((
233 RpDelete::default(),
234 oio::OneShotDeleter::new(GridfsDeleter::new(self.core.clone(), self.root.clone())),
235 ))
236 }
237}