opendal/services/gridfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use mea::once::OnceCell;
21
22use super::GRIDFS_SCHEME;
23use super::config::GridfsConfig;
24use super::core::*;
25use super::deleter::GridfsDeleter;
26use super::writer::GridfsWriter;
27use crate::raw::*;
28use crate::*;
29
30#[doc = include_str!("docs.md")]
31#[derive(Debug, Default)]
32pub struct GridfsBuilder {
33    pub(super) config: GridfsConfig,
34}
35
36impl GridfsBuilder {
37    /// Set the connection_string of the MongoDB service.
38    ///
39    /// This connection string is used to connect to the MongoDB service. It typically follows the format:
40    ///
41    /// ## Format
42    ///
43    /// `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
44    ///
45    /// Examples:
46    ///
47    /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
48    /// - Using authentication: `mongodb://myUser:myPassword@localhost:27017/myAuthDB`
49    /// - Specifying authentication mechanism: `mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
50    ///
51    /// ## Options
52    ///
53    /// - `authMechanism`: Specifies the authentication method to use. Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
54    /// - ... (any other options you wish to highlight)
55    ///
56    /// For more information, please refer to [MongoDB Connection String URI Format](https://docs.mongodb.com/manual/reference/connection-string/).
57    pub fn connection_string(mut self, v: &str) -> Self {
58        if !v.is_empty() {
59            self.config.connection_string = Some(v.to_string());
60        }
61        self
62    }
63
64    /// Set the working directory, all operations will be performed under it.
65    ///
66    /// default: "/"
67    pub fn root(mut self, root: &str) -> Self {
68        self.config.root = if root.is_empty() {
69            None
70        } else {
71            Some(root.to_string())
72        };
73
74        self
75    }
76
77    /// Set the database name of the MongoDB GridFs service to read/write.
78    pub fn database(mut self, database: &str) -> Self {
79        if !database.is_empty() {
80            self.config.database = Some(database.to_string());
81        }
82        self
83    }
84
85    /// Set the bucket name of the MongoDB GridFs service to read/write.
86    ///
87    /// Default to `fs` if not specified.
88    pub fn bucket(mut self, bucket: &str) -> Self {
89        if !bucket.is_empty() {
90            self.config.bucket = Some(bucket.to_string());
91        }
92        self
93    }
94
95    /// Set the chunk size of the MongoDB GridFs service used to break the user file into chunks.
96    ///
97    /// Default to `255 KiB` if not specified.
98    pub fn chunk_size(mut self, chunk_size: u32) -> Self {
99        if chunk_size > 0 {
100            self.config.chunk_size = Some(chunk_size);
101        }
102        self
103    }
104}
105
106impl Builder for GridfsBuilder {
107    type Config = GridfsConfig;
108
109    fn build(self) -> Result<impl Access> {
110        let conn = match &self.config.connection_string.clone() {
111            Some(v) => v.clone(),
112            None => {
113                return Err(
114                    Error::new(ErrorKind::ConfigInvalid, "connection_string is required")
115                        .with_context("service", GRIDFS_SCHEME),
116                );
117            }
118        };
119        let database = match &self.config.database.clone() {
120            Some(v) => v.clone(),
121            None => {
122                return Err(Error::new(ErrorKind::ConfigInvalid, "database is required")
123                    .with_context("service", GRIDFS_SCHEME));
124            }
125        };
126        let bucket = match &self.config.bucket.clone() {
127            Some(v) => v.clone(),
128            None => "fs".to_string(),
129        };
130        let chunk_size = self.config.chunk_size.unwrap_or(255);
131
132        let root = normalize_root(
133            self.config
134                .root
135                .clone()
136                .unwrap_or_else(|| "/".to_string())
137                .as_str(),
138        );
139
140        Ok(GridfsBackend::new(GridfsCore {
141            connection_string: conn,
142            database,
143            bucket,
144            chunk_size,
145            bucket_instance: OnceCell::new(),
146        })
147        .with_normalized_root(root))
148    }
149}
150
151/// Backend for Gridfs services.
152#[derive(Clone, Debug)]
153pub struct GridfsBackend {
154    core: Arc<GridfsCore>,
155    root: String,
156    info: Arc<AccessorInfo>,
157}
158
159impl GridfsBackend {
160    pub fn new(core: GridfsCore) -> Self {
161        let info = AccessorInfo::default();
162        info.set_scheme(GRIDFS_SCHEME);
163        info.set_name(&format!("{}/{}", core.database, core.bucket));
164        info.set_root("/");
165        info.set_native_capability(Capability {
166            read: true,
167            stat: true,
168            write: true,
169            write_can_empty: true,
170            delete: true,
171            shared: true,
172            ..Default::default()
173        });
174
175        Self {
176            core: Arc::new(core),
177            root: "/".to_string(),
178            info: Arc::new(info),
179        }
180    }
181
182    fn with_normalized_root(mut self, root: String) -> Self {
183        self.info.set_root(&root);
184        self.root = root;
185        self
186    }
187}
188
189impl Access for GridfsBackend {
190    type Reader = Buffer;
191    type Writer = GridfsWriter;
192    type Lister = ();
193    type Deleter = oio::OneShotDeleter<GridfsDeleter>;
194
195    fn info(&self) -> Arc<AccessorInfo> {
196        self.info.clone()
197    }
198
199    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
200        let p = build_abs_path(&self.root, path);
201
202        if p == build_abs_path(&self.root, "") {
203            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
204        } else {
205            let bs = self.core.get(&p).await?;
206            match bs {
207                Some(bs) => Ok(RpStat::new(
208                    Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
209                )),
210                None => Err(Error::new(ErrorKind::NotFound, "kv not found in gridfs")),
211            }
212        }
213    }
214
215    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
216        let p = build_abs_path(&self.root, path);
217        let bs = match self.core.get(&p).await? {
218            Some(bs) => bs,
219            None => {
220                return Err(Error::new(ErrorKind::NotFound, "kv not found in gridfs"));
221            }
222        };
223        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
224    }
225
226    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
227        let p = build_abs_path(&self.root, path);
228        Ok((RpWrite::new(), GridfsWriter::new(self.core.clone(), p)))
229    }
230
231    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
232        Ok((
233            RpDelete::default(),
234            oio::OneShotDeleter::new(GridfsDeleter::new(self.core.clone(), self.root.clone())),
235        ))
236    }
237}