opendal/services/mongodb/
backend.rs1use std::sync::Arc;
19
20use tokio::sync::OnceCell;
21
22use super::MONGODB_SCHEME;
23use super::config::MongodbConfig;
24use super::core::*;
25use super::deleter::MongodbDeleter;
26use super::writer::MongodbWriter;
27use crate::raw::*;
28use crate::*;
29
30#[doc = include_str!("docs.md")]
31#[derive(Debug, Default)]
32pub struct MongodbBuilder {
33 pub(super) config: MongodbConfig,
34}
35
36impl MongodbBuilder {
37 pub fn connection_string(mut self, v: &str) -> Self {
58 if !v.is_empty() {
59 self.config.connection_string = Some(v.to_string());
60 }
61 self
62 }
63 pub fn root(mut self, root: &str) -> Self {
67 self.config.root = if root.is_empty() {
68 None
69 } else {
70 Some(root.to_string())
71 };
72
73 self
74 }
75
76 pub fn database(mut self, database: &str) -> Self {
78 if !database.is_empty() {
79 self.config.database = Some(database.to_string());
80 }
81 self
82 }
83
84 pub fn collection(mut self, collection: &str) -> Self {
86 if !collection.is_empty() {
87 self.config.collection = Some(collection.to_string());
88 }
89 self
90 }
91
92 pub fn key_field(mut self, key_field: &str) -> Self {
96 if !key_field.is_empty() {
97 self.config.key_field = Some(key_field.to_string());
98 }
99 self
100 }
101
102 pub fn value_field(mut self, value_field: &str) -> Self {
106 if !value_field.is_empty() {
107 self.config.value_field = Some(value_field.to_string());
108 }
109 self
110 }
111}
112
113impl Builder for MongodbBuilder {
114 type Config = MongodbConfig;
115
116 fn build(self) -> Result<impl Access> {
117 let conn = match &self.config.connection_string.clone() {
118 Some(v) => v.clone(),
119 None => {
120 return Err(
121 Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
122 .with_context("service", MONGODB_SCHEME),
123 );
124 }
125 };
126 let database = match &self.config.database.clone() {
127 Some(v) => v.clone(),
128 None => {
129 return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
130 .with_context("service", MONGODB_SCHEME));
131 }
132 };
133 let collection = match &self.config.collection.clone() {
134 Some(v) => v.clone(),
135 None => {
136 return Err(
137 Error::new(ErrorKind::ConfigInvalid, "collection is required")
138 .with_context("service", MONGODB_SCHEME),
139 );
140 }
141 };
142 let key_field = match &self.config.key_field.clone() {
143 Some(v) => v.clone(),
144 None => "key".to_string(),
145 };
146 let value_field = match &self.config.value_field.clone() {
147 Some(v) => v.clone(),
148 None => "value".to_string(),
149 };
150 let root = normalize_root(
151 self.config
152 .root
153 .clone()
154 .unwrap_or_else(|| "/".to_string())
155 .as_str(),
156 );
157 Ok(MongodbBackend::new(MongodbCore {
158 connection_string: conn,
159 database,
160 collection,
161 collection_instance: OnceCell::new(),
162 key_field,
163 value_field,
164 })
165 .with_normalized_root(root))
166 }
167}
168
169#[derive(Clone, Debug)]
171pub struct MongodbBackend {
172 core: Arc<MongodbCore>,
173 root: String,
174 info: Arc<AccessorInfo>,
175}
176
177impl MongodbBackend {
178 pub fn new(core: MongodbCore) -> Self {
179 let info = AccessorInfo::default();
180 info.set_scheme(MONGODB_SCHEME);
181 info.set_name(&format!("{}/{}", core.database, core.collection));
182 info.set_root("/");
183 info.set_native_capability(Capability {
184 read: true,
185 stat: true,
186 write: true,
187 write_can_empty: true,
188 delete: true,
189 shared: true,
190 ..Default::default()
191 });
192
193 Self {
194 core: Arc::new(core),
195 root: "/".to_string(),
196 info: Arc::new(info),
197 }
198 }
199
200 fn with_normalized_root(mut self, root: String) -> Self {
201 self.info.set_root(&root);
202 self.root = root;
203 self
204 }
205}
206
207impl Access for MongodbBackend {
208 type Reader = Buffer;
209 type Writer = MongodbWriter;
210 type Lister = ();
211 type Deleter = oio::OneShotDeleter<MongodbDeleter>;
212
213 fn info(&self) -> Arc<AccessorInfo> {
214 self.info.clone()
215 }
216
217 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
218 let p = build_abs_path(&self.root, path);
219
220 if p == build_abs_path(&self.root, "") {
221 Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
222 } else {
223 let bs = self.core.get(&p).await?;
224 match bs {
225 Some(bs) => Ok(RpStat::new(
226 Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
227 )),
228 None => Err(Error::new(ErrorKind::NotFound, "kv not found in mongodb")),
229 }
230 }
231 }
232
233 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
234 let p = build_abs_path(&self.root, path);
235 let bs = match self.core.get(&p).await? {
236 Some(bs) => bs,
237 None => {
238 return Err(Error::new(ErrorKind::NotFound, "kv not found in mongodb"));
239 }
240 };
241 Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
242 }
243
244 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
245 let p = build_abs_path(&self.root, path);
246 Ok((RpWrite::new(), MongodbWriter::new(self.core.clone(), p)))
247 }
248
249 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
250 Ok((
251 RpDelete::default(),
252 oio::OneShotDeleter::new(MongodbDeleter::new(self.core.clone(), self.root.clone())),
253 ))
254 }
255}